This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new b594483b22 IGNITE-22214 Meta storage idempotent invokes: implement
idempotent cache cleanup logic (#3830)
b594483b22 is described below
commit b594483b22d473ed55af7552757c32e54d445175
Author: Alexander Lapin <[email protected]>
AuthorDate: Thu Jun 6 17:35:26 2024 +0300
IGNITE-22214 Meta storage idempotent invokes: implement idempotent cache
cleanup logic (#3830)
---
.../DistributionZoneRebalanceEngineTest.java | 16 ++++-
.../RebalanceUtilUpdateAssignmentsTest.java | 17 +++++-
.../impl/ItIdempotentCommandCacheTest.java | 64 ++++++++++++++++++--
.../impl/ItMetaStorageManagerImplTest.java | 13 +++-
.../ItMetaStorageMultipleNodesAbstractTest.java | 6 +-
.../impl/ItMetaStorageServicePersistenceTest.java | 9 ++-
.../metastorage/impl/ItMetaStorageServiceTest.java | 13 +++-
.../metastorage/impl/ItMetaStorageWatchTest.java | 6 +-
.../server/raft/ItMetaStorageRaftGroupTest.java | 41 +++++++++++--
.../metastorage/impl/MetaStorageManagerImpl.java | 69 ++++++++++++++++++++--
.../server/raft/MetaStorageListener.java | 29 ++++++++-
.../server/raft/MetaStorageWriteHandler.java | 61 ++++++++++++++++++-
.../impl/IdempotentCommandCacheTest.java | 28 ++++++---
.../MetaStorageDeployWatchesCorrectnessTest.java | 9 ++-
.../impl/MetaStorageManagerRecoveryTest.java | 9 ++-
.../impl/StandaloneMetaStorageManager.java | 19 +++++-
.../MultiActorPlacementDriverTest.java | 9 ++-
.../PlacementDriverManagerTest.java | 9 ++-
.../service/ItAbstractListenerSnapshotTest.java | 2 +-
.../ItDistributedConfigurationPropertiesTest.java | 6 +-
.../ItDistributedConfigurationStorageTest.java | 6 +-
.../runner/app/ItIgniteNodeRestartTest.java | 9 ++-
.../org/apache/ignite/internal/app/IgniteImpl.java | 8 ++-
.../rebalance/ItRebalanceDistributedTest.java | 8 ++-
24 files changed, 410 insertions(+), 56 deletions(-)
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
index 445fa9cfd9..cc301bce76 100644
---
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
@@ -30,6 +30,7 @@ import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.REBALANCE_SCHEDULER_POOL_SIZE;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartAssignmentsKey;
+import static
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
import static org.apache.ignite.internal.table.TableTestUtils.getTableIdStrict;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
@@ -72,6 +73,8 @@ import org.apache.ignite.internal.affinity.Assignments;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.commands.ColumnParams;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
import org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil;
import org.apache.ignite.internal.distributionzones.Node;
@@ -100,6 +103,7 @@ import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.raft.Command;
import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.raft.service.CommandClosure;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.replicator.TablePartitionId;
@@ -112,10 +116,12 @@ import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
/**
* Tests the distribution zone dataNodes watch listener in {@link
DistributionZoneRebalanceEngine}.
*/
+@ExtendWith(ConfigurationExtension.class)
public class DistributionZoneRebalanceEngineTest extends IgniteAbstractTest {
private static final String ZONE_NAME_0 = "zone0";
@@ -141,6 +147,9 @@ public class DistributionZoneRebalanceEngineTest extends
IgniteAbstractTest {
private ScheduledExecutorService rebalanceScheduler;
+ @InjectConfiguration
+ private RaftConfiguration raftConfiguration;
+
@BeforeEach
public void setUp() {
String nodeName = "test";
@@ -180,7 +189,12 @@ public class DistributionZoneRebalanceEngineTest extends
IgniteAbstractTest {
keyValueStorage = spy(new SimpleInMemoryKeyValueStorage(nodeName));
- MetaStorageListener metaStorageListener = new
MetaStorageListener(keyValueStorage, mock(ClusterTimeImpl.class));
+ MetaStorageListener metaStorageListener = new MetaStorageListener(
+ keyValueStorage,
+ mock(ClusterTimeImpl.class),
+ raftConfiguration.retryTimeout(),
+ completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
+ );
RaftGroupService metaStorageService = mock(RaftGroupService.class);
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java
index 8e98c825c2..af146edfcc 100644
---
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java
@@ -17,10 +17,12 @@
package org.apache.ignite.internal.distributionzones.rebalance;
+import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.Collectors.toSet;
import static
org.apache.ignite.internal.affinity.AffinityUtils.calculateAssignmentForPartition;
import static org.apache.ignite.internal.affinity.Assignments.toBytes;
import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
+import static
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -42,6 +44,8 @@ import org.apache.ignite.internal.affinity.Assignment;
import org.apache.ignite.internal.affinity.Assignments;
import
org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.logger.IgniteLogger;
@@ -59,6 +63,7 @@ import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.raft.Command;
import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.raft.service.CommandClosure;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.replicator.TablePartitionId;
@@ -76,7 +81,7 @@ import org.mockito.quality.Strictness;
/**
* Tests for updating assignment in the meta storage.
*/
-@ExtendWith(MockitoExtension.class)
+@ExtendWith({MockitoExtension.class, ConfigurationExtension.class})
@MockitoSettings(strictness = Strictness.LENIENT)
public class RebalanceUtilUpdateAssignmentsTest extends IgniteAbstractTest {
private static final IgniteLogger LOG =
Loggers.forClass(RebalanceUtilUpdateAssignmentsTest.class);
@@ -99,6 +104,9 @@ public class RebalanceUtilUpdateAssignmentsTest extends
IgniteAbstractTest {
DEFAULT_STORAGE_PROFILE
);
+ @InjectConfiguration
+ private RaftConfiguration raftConfiguration;
+
private static final int partNum = 2;
private static final int replicas = 2;
@@ -122,7 +130,12 @@ public class RebalanceUtilUpdateAssignmentsTest extends
IgniteAbstractTest {
keyValueStorage = spy(new SimpleInMemoryKeyValueStorage("test"));
- MetaStorageListener metaStorageListener = new
MetaStorageListener(keyValueStorage, mock(ClusterTimeImpl.class));
+ MetaStorageListener metaStorageListener = new MetaStorageListener(
+ keyValueStorage,
+ mock(ClusterTimeImpl.class),
+ raftConfiguration.retryTimeout(),
+ completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
+ );
RaftGroupService metaStorageService = mock(RaftGroupService.class);
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
index 66c0610155..453a80eca4 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
@@ -17,7 +17,9 @@
package org.apache.ignite.internal.metastorage.impl;
+import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.Collectors.toSet;
+import static
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
import static org.apache.ignite.internal.metastorage.dsl.Operations.ops;
import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
@@ -30,6 +32,7 @@ import static
org.apache.ignite.internal.util.IgniteUtils.startAsync;
import static org.apache.ignite.internal.util.IgniteUtils.stopAsync;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -51,8 +54,12 @@ import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopolog
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.failure.NoOpFailureProcessor;
+import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.hlc.ClockServiceImpl;
+import org.apache.ignite.internal.hlc.ClockWaiter;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.manager.ComponentContext;
@@ -114,8 +121,9 @@ public class ItIdempotentCommandCacheTest extends
IgniteAbstractTest {
private static final int YIELD_RESULT = 10;
private static final int ANOTHER_YIELD_RESULT = 20;
- @InjectConfiguration("mock.responseTimeout = 100")
+ @InjectConfiguration("mock.retryTimeout = 10000")
private RaftConfiguration raftConfiguration;
+
@InjectConfiguration("mock.idleSyncTimeInterval = 100")
private MetaStorageConfiguration metaStorageConfiguration;
@@ -132,6 +140,10 @@ public class ItIdempotentCommandCacheTest extends
IgniteAbstractTest {
ClusterManagementGroupManager cmgManager;
+ ClockWaiter clockWaiter;
+
+ ClockService clockService;
+
Node(
TestInfo testInfo,
RaftConfiguration raftConfiguration,
@@ -184,7 +196,17 @@ public class ItIdempotentCommandCacheTest extends
IgniteAbstractTest {
clock,
topologyAwareRaftGroupServiceFactory,
new NoOpMetricManager(),
- metaStorageConfiguration
+ metaStorageConfiguration,
+ raftConfiguration.retryTimeout(),
+ completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
+ );
+
+ clockWaiter = new ClockWaiter(clusterService.nodeName(), clock);
+
+ clockService = new ClockServiceImpl(
+ clock,
+ clockWaiter,
+ () -> TEST_MAX_CLOCK_SKEW_MILLIS
);
}
@@ -193,7 +215,10 @@ public class ItIdempotentCommandCacheTest extends
IgniteAbstractTest {
when(cmgManager.metaStorageNodes()).thenReturn(metaStorageNodesFut);
}
- assertThat(startAsync(new ComponentContext(), clusterService,
raftManager, metaStorageManager), willCompleteSuccessfully());
+ assertThat(
+ startAsync(new ComponentContext(), clusterService,
raftManager, metaStorageManager, clockWaiter),
+ willCompleteSuccessfully()
+ );
}
void deployWatches() {
@@ -201,7 +226,7 @@ public class ItIdempotentCommandCacheTest extends
IgniteAbstractTest {
}
void stop() throws Exception {
- List<IgniteComponent> components = List.of(metaStorageManager,
raftManager, clusterService);
+ List<IgniteComponent> components = List.of(clockWaiter,
metaStorageManager, raftManager, clusterService);
closeAll(Stream.concat(
components.stream().map(c -> c::beforeNodeStop),
@@ -364,6 +389,8 @@ public class ItIdempotentCommandCacheTest extends
IgniteAbstractTest {
// Restart cluster.
startCluster(testInfo);
+ long timestampAfterRestartPhysicalLong =
nodes.get(0).clockService.now().getPhysical();
+
leader = leader(raftClient());
// Run same idempotent command one more time and check that condition
wasn't re-evaluated, but was retrieved from the cache instead.
@@ -374,9 +401,36 @@ public class ItIdempotentCommandCacheTest extends
IgniteAbstractTest {
assertTrue((Boolean) commandProcessingResult2);
assertTrue(leader.checkValueInStorage(TEST_KEY.bytes(),
TEST_VALUE));
} else {
- assertEquals(YIELD_RESULT, ((StatementResult)
commandProcessingResult).getAsInt());
+ assertEquals(YIELD_RESULT, ((StatementResult)
commandProcessingResult2).getAsInt());
assertTrue(leader.checkValueInStorage(TEST_KEY_2.bytes(),
TEST_VALUE_2));
}
+
+ for (Node node : nodes) {
+ assertThat(node.clockService.waitFor(
+ new HybridTimestamp(
+ timestampAfterRestartPhysicalLong +
raftConfiguration.retryTimeout().value()
+ + node.clockService.maxClockSkewMillis(),
+ 0
+ )
+ ), willCompleteSuccessfully());
+ }
+
+ for (Node node : nodes) {
+ node.metaStorageManager.evictIdempotentCommandsCache();
+ }
+
+
+ // Run same idempotent command one more time and check that condition
**was** re-evaluated and not retrieved from the cache.
+ CompletableFuture<Object> commandProcessingResultFuture3 =
raftClient().run(idempotentCommand);
+ assertThat(commandProcessingResultFuture3, willCompleteSuccessfully());
+ Object commandProcessingResult3 = commandProcessingResultFuture3.get();
+ if (idempotentCommand instanceof InvokeCommand) {
+ assertFalse((Boolean) commandProcessingResult3);
+ assertTrue(leader.checkValueInStorage(TEST_KEY.bytes(),
ANOTHER_VALUE));
+ } else {
+ assertEquals(ANOTHER_YIELD_RESULT, ((StatementResult)
commandProcessingResult3).getAsInt());
+ assertTrue(leader.checkValueInStorage(TEST_KEY_2.bytes(),
ANOTHER_VALUE_2));
+ }
}
private Node leader(RaftGroupService raftClient) {
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
index ab273a4d70..024087ba0b 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.metastorage.impl;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.Collectors.toList;
+import static
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
import static
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.clusterService;
import static
org.apache.ignite.internal.testframework.flow.TestFlowUtils.subscribeToList;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast;
@@ -99,10 +100,12 @@ public class ItMetaStorageManagerImplTest extends
IgniteAbstractTest {
private MetaStorageManagerImpl metaStorageManager;
+ @InjectConfiguration
+ private RaftConfiguration raftConfiguration;
+
@BeforeEach
void setUp(
TestInfo testInfo,
- @InjectConfiguration RaftConfiguration raftConfiguration,
@InjectConfiguration("mock.idleSyncTimeInterval = 100")
MetaStorageConfiguration metaStorageConfiguration
) {
var addr = new NetworkAddress("localhost", 10_000);
@@ -143,7 +146,9 @@ public class ItMetaStorageManagerImplTest extends
IgniteAbstractTest {
clock,
topologyAwareRaftGroupServiceFactory,
new NoOpMetricManager(),
- metaStorageConfiguration
+ metaStorageConfiguration,
+ raftConfiguration.retryTimeout(),
+ completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
);
assertThat(
@@ -230,7 +235,9 @@ public class ItMetaStorageManagerImplTest extends
IgniteAbstractTest {
storage,
new HybridClockImpl(),
mock(TopologyAwareRaftGroupServiceFactory.class),
- new NoOpMetricManager()
+ new NoOpMetricManager(),
+ raftConfiguration.retryTimeout(),
+ completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
);
assertThat(metaStorageManager.stopAsync(new ComponentContext()),
willCompleteSuccessfully());
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
index 5e55468fb1..b565090691 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
@@ -18,7 +18,9 @@
package org.apache.ignite.internal.metastorage.impl;
import static java.util.concurrent.CompletableFuture.allOf;
+import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.Collectors.toSet;
+import static
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.revision;
import static org.apache.ignite.internal.metastorage.dsl.Operations.noop;
@@ -202,7 +204,9 @@ public abstract class
ItMetaStorageMultipleNodesAbstractTest extends IgniteAbstr
clock,
topologyAwareRaftGroupServiceFactory,
new NoOpMetricManager(),
- metaStorageConfiguration
+ metaStorageConfiguration,
+ raftConfiguration.retryTimeout(),
+ completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
);
deployWatchesFut = metaStorageManager.deployWatches();
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java
index d73d74e6b9..ac10b1c626 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.metastorage.impl;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.nio.charset.StandardCharsets;
@@ -158,7 +160,12 @@ public class ItMetaStorageServicePersistenceTest extends
ItAbstractListenerSnaps
return s;
});
- return new MetaStorageListener(storage, new ClusterTimeImpl(nodeName,
new IgniteSpinBusyLock(), new HybridClockImpl()));
+ return new MetaStorageListener(
+ storage,
+ new ClusterTimeImpl(nodeName, new IgniteSpinBusyLock(), new
HybridClockImpl()),
+ raftConfiguration.retryTimeout(),
+ completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
+ );
}
/** {@inheritDoc} */
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java
index ab00dedab7..36332b02a9 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java
@@ -17,8 +17,10 @@
package org.apache.ignite.internal.metastorage.impl;
+import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toUnmodifiableSet;
+import static
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.and;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.or;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.revision;
@@ -186,8 +188,11 @@ public class ItMetaStorageServiceTest extends
BaseIgniteAbstractTest {
private MetaStorageService metaStorageService;
+ private RaftConfiguration raftConfiguration;
+
Node(ClusterService clusterService, RaftConfiguration
raftConfiguration, Path dataPath) {
this.clusterService = clusterService;
+ this.raftConfiguration = raftConfiguration;
HybridClock clock = new HybridClockImpl();
@@ -197,7 +202,6 @@ public class ItMetaStorageServiceTest extends
BaseIgniteAbstractTest {
dataPath.resolve(name()),
clock
);
-
this.clusterTime = new ClusterTimeImpl(clusterService.nodeName(),
new IgniteSpinBusyLock(), clock);
this.mockStorage = mock(KeyValueStorage.class);
@@ -234,7 +238,12 @@ public class ItMetaStorageServiceTest extends
BaseIgniteAbstractTest {
assert peer != null;
- var listener = new MetaStorageListener(mockStorage, clusterTime);
+ var listener = new MetaStorageListener(
+ mockStorage,
+ clusterTime,
+ raftConfiguration.retryTimeout(),
+ completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
+ );
var raftNodeId = new RaftNodeId(MetastorageGroupId.INSTANCE, peer);
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
index 583d08488f..947f0869a9 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
@@ -17,8 +17,10 @@
package org.apache.ignite.internal.metastorage.impl;
+import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
+import static
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
import static
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.findLocalAddresses;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
@@ -193,7 +195,9 @@ public class ItMetaStorageWatchTest extends
IgniteAbstractTest {
clock,
topologyAwareRaftGroupServiceFactory,
new NoOpMetricManager(),
- metaStorageConfiguration
+ metaStorageConfiguration,
+ raftConfiguration.retryTimeout(),
+ completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
);
components.add(metaStorageManager);
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java
index 27d09f80a5..50894e927e 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java
@@ -17,8 +17,10 @@
package org.apache.ignite.internal.metastorage.server.raft;
+import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.Collectors.collectingAndThen;
import static java.util.stream.Collectors.toSet;
+import static
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
import static
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.findLocalAddresses;
import static
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.waitForTopology;
import static org.apache.ignite.internal.raft.server.RaftGroupOptions.defaults;
@@ -406,18 +408,45 @@ public class ItMetaStorageRaftGroupTest extends
IgniteAbstractTest {
var raftNodeId1 = new RaftNodeId(MetastorageGroupId.INSTANCE,
membersConfiguration.peer(localMemberName(cluster.get(0))));
- metaStorageRaftSrv1.startRaftNode(raftNodeId1, membersConfiguration,
- new MetaStorageListener(mockStorage,
mock(ClusterTimeImpl.class)), defaults());
+ metaStorageRaftSrv1.startRaftNode(
+ raftNodeId1,
+ membersConfiguration,
+ new MetaStorageListener(
+ mockStorage,
+ mock(ClusterTimeImpl.class),
+ raftConfiguration.retryTimeout(),
+ completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
+ ),
+ defaults()
+ );
var raftNodeId2 = new RaftNodeId(MetastorageGroupId.INSTANCE,
membersConfiguration.peer(localMemberName(cluster.get(1))));
- metaStorageRaftSrv2.startRaftNode(raftNodeId2, membersConfiguration,
- new MetaStorageListener(mockStorage,
mock(ClusterTimeImpl.class)), defaults());
+ metaStorageRaftSrv2.startRaftNode(
+ raftNodeId2,
+ membersConfiguration,
+ new MetaStorageListener(
+ mockStorage,
+ mock(ClusterTimeImpl.class),
+ raftConfiguration.retryTimeout(),
+ completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
+ ),
+ defaults()
+ );
var raftNodeId3 = new RaftNodeId(MetastorageGroupId.INSTANCE,
membersConfiguration.peer(localMemberName(cluster.get(2))));
- metaStorageRaftSrv3.startRaftNode(raftNodeId3, membersConfiguration,
- new MetaStorageListener(mockStorage,
mock(ClusterTimeImpl.class)), defaults());
+ metaStorageRaftSrv3.startRaftNode(
+ raftNodeId3,
+ membersConfiguration,
+ new MetaStorageListener(
+ mockStorage,
+ mock(ClusterTimeImpl.class),
+ raftConfiguration.retryTimeout(),
+ completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
+ ),
+ defaults()
+ );
metaStorageRaftGrpSvc1 =
waitForRaftGroupServiceSafely(RaftGroupServiceImpl.start(
MetastorageGroupId.INSTANCE,
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
index cb63d3ddf9..01f87b37a5 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
@@ -30,6 +30,8 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.LongSupplier;
+import org.apache.ignite.configuration.ConfigurationValue;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
import org.apache.ignite.internal.hlc.HybridClock;
@@ -130,6 +132,14 @@ public class MetaStorageManagerImpl implements
MetaStorageManager {
private volatile MetaStorageConfiguration metaStorageConfiguration;
+ private final ConfigurationValue<Long> idempotentCacheTtl;
+
+ private final CompletableFuture<LongSupplier> maxClockSkewMillisFuture;
+
+ private volatile MetaStorageListener followerListener;
+
+ private volatile MetaStorageListener learnerListener;
+
/**
* The constructor.
*
@@ -140,6 +150,7 @@ public class MetaStorageManagerImpl implements
MetaStorageManager {
* @param storage Storage. This component owns this resource and will
manage its lifecycle.
* @param clock A hybrid logical clock.
* @param metricManager Metric manager.
+ * @param maxClockSkewMillisFuture Future with maximum clock skew in
milliseconds.
*/
public MetaStorageManagerImpl(
ClusterService clusterService,
@@ -149,7 +160,9 @@ public class MetaStorageManagerImpl implements
MetaStorageManager {
KeyValueStorage storage,
HybridClock clock,
TopologyAwareRaftGroupServiceFactory
topologyAwareRaftGroupServiceFactory,
- MetricManager metricManager
+ MetricManager metricManager,
+ ConfigurationValue<Long> idempotentCacheTtl,
+ CompletableFuture<LongSupplier> maxClockSkewMillisFuture
) {
this.clusterService = clusterService;
this.raftMgr = raftMgr;
@@ -157,9 +170,11 @@ public class MetaStorageManagerImpl implements
MetaStorageManager {
this.logicalTopologyService = logicalTopologyService;
this.storage = storage;
this.clusterTime = new ClusterTimeImpl(clusterService.nodeName(),
busyLock, clock);
- metaStorageMetricSource = new MetaStorageMetricSource(clusterTime);
+ this.metaStorageMetricSource = new
MetaStorageMetricSource(clusterTime);
this.topologyAwareRaftGroupServiceFactory =
topologyAwareRaftGroupServiceFactory;
this.metricManager = metricManager;
+ this.idempotentCacheTtl = idempotentCacheTtl;
+ this.maxClockSkewMillisFuture = maxClockSkewMillisFuture;
}
/**
@@ -175,9 +190,22 @@ public class MetaStorageManagerImpl implements
MetaStorageManager {
HybridClock clock,
TopologyAwareRaftGroupServiceFactory
topologyAwareRaftGroupServiceFactory,
MetricManager metricManager,
- MetaStorageConfiguration configuration
+ MetaStorageConfiguration configuration,
+ ConfigurationValue<Long> idempotentCacheTtl,
+ CompletableFuture<LongSupplier> maxClockSkewMillisFuture
) {
- this(clusterService, cmgMgr, logicalTopologyService, raftMgr, storage,
clock, topologyAwareRaftGroupServiceFactory, metricManager);
+ this(
+ clusterService,
+ cmgMgr,
+ logicalTopologyService,
+ raftMgr,
+ storage,
+ clock,
+ topologyAwareRaftGroupServiceFactory,
+ metricManager,
+ idempotentCacheTtl,
+ maxClockSkewMillisFuture
+ );
configure(configuration);
}
@@ -293,10 +321,17 @@ public class MetaStorageManagerImpl implements
MetaStorageManager {
assert localMetaStorageConfiguration != null : "Meta Storage
configuration has not been set";
+ followerListener = new MetaStorageListener(
+ storage,
+ clusterTime,
+ idempotentCacheTtl,
+ maxClockSkewMillisFuture
+ );
+
CompletableFuture<TopologyAwareRaftGroupService> raftServiceFuture =
raftMgr.startRaftGroupNodeAndWaitNodeReadyFuture(
new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer),
configuration,
- new MetaStorageListener(storage, clusterTime),
+ followerListener,
RaftGroupEventsListener.noopLsnr,
disruptorConfig,
topologyAwareRaftGroupServiceFactory
@@ -334,10 +369,17 @@ public class MetaStorageManagerImpl implements
MetaStorageManager {
assert localPeer != null;
+ learnerListener = new MetaStorageListener(
+ storage,
+ clusterTime,
+ idempotentCacheTtl,
+ maxClockSkewMillisFuture
+ );
+
return raftMgr.startRaftGroupNodeAndWaitNodeReadyFuture(
new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer),
configuration,
- new MetaStorageListener(storage, clusterTime),
+ learnerListener,
RaftGroupEventsListener.noopLsnr,
disruptorConfig
);
@@ -845,4 +887,19 @@ public class MetaStorageManagerImpl implements
MetaStorageManager {
public CompletableFuture<Void> notifyRevisionUpdateListenerOnStart() {
return
recoveryFinishedFuture.thenCompose(storage::notifyRevisionUpdateListenerOnStart);
}
+
+ /**
+ * Removes obsolete entries from both volatile and persistent idempotent
command cache.
+ */
+ @TestOnly
+ @Deprecated(forRemoval = true)
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-19417 cache eviction
should be triggered by MS GC instead.
+ public void evictIdempotentCommandsCache() {
+ if (followerListener != null) {
+ followerListener.evictIdempotentCommandsCache();
+ }
+ if (learnerListener != null) {
+ learnerListener.evictIdempotentCommandsCache();
+ }
+ }
}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
index a49e594983..163bd2dcad 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
@@ -24,7 +24,11 @@ import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
+import java.util.function.LongSupplier;
+import org.apache.ignite.configuration.ConfigurationValue;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.command.GetAllCommand;
@@ -44,6 +48,7 @@ import org.apache.ignite.internal.raft.service.CommandClosure;
import org.apache.ignite.internal.raft.service.RaftGroupListener;
import org.apache.ignite.internal.util.Cursor;
import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
/**
* Meta storage listener.
@@ -60,9 +65,19 @@ public class MetaStorageListener implements
RaftGroupListener, BeforeApplyHandle
*
* @param storage Storage.
*/
- public MetaStorageListener(KeyValueStorage storage, ClusterTimeImpl
clusterTime) {
+ public MetaStorageListener(
+ KeyValueStorage storage,
+ ClusterTimeImpl clusterTime,
+ ConfigurationValue<Long> idempotentCacheTtl,
+ CompletableFuture<LongSupplier> maxClockSkewMillisFuture
+ ) {
this.storage = storage;
- this.writeHandler = new MetaStorageWriteHandler(storage, clusterTime);
+ this.writeHandler = new MetaStorageWriteHandler(
+ storage,
+ clusterTime,
+ idempotentCacheTtl,
+ maxClockSkewMillisFuture
+ );
}
@Override
@@ -174,4 +189,14 @@ public class MetaStorageListener implements
RaftGroupListener, BeforeApplyHandle
@Override
public void onShutdown() {
}
+
+ /**
+ * Removes obsolete entries from both volatile and persistent idempotent
command cache.
+ */
+ @TestOnly
+ @Deprecated(forRemoval = true)
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-19417 cache eviction
should be triggered by MS GC instead.
+ public void evictIdempotentCommandsCache() {
+ writeHandler.evictIdempotentCommandsCache(HybridTimestamp.MIN_VALUE);
+ }
}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
index 95c3685fb1..33bc88b658 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
@@ -18,13 +18,18 @@
package org.apache.ignite.internal.metastorage.server.raft;
import static java.util.Arrays.copyOfRange;
+import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.util.ByteUtils.byteToBoolean;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.LongSupplier;
+import org.apache.ignite.configuration.ConfigurationValue;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.logger.IgniteLogger;
@@ -61,6 +66,7 @@ import
org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl;
import org.apache.ignite.internal.raft.Command;
import org.apache.ignite.internal.raft.WriteCommand;
import org.apache.ignite.internal.raft.service.CommandClosure;
+import org.apache.ignite.internal.util.ArrayUtils;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.Cursor;
import org.jetbrains.annotations.Nullable;
@@ -81,9 +87,20 @@ public class MetaStorageWriteHandler {
private final Map<CommandId, IdempotentCommandCachedResult>
idempotentCommandCache = new ConcurrentHashMap<>();
- MetaStorageWriteHandler(KeyValueStorage storage, ClusterTimeImpl
clusterTime) {
+ private final ConfigurationValue<Long> idempotentCacheTtl;
+
+ private final CompletableFuture<LongSupplier> maxClockSkewMillisFuture;
+
+ MetaStorageWriteHandler(
+ KeyValueStorage storage,
+ ClusterTimeImpl clusterTime,
+ ConfigurationValue<Long> idempotentCacheTtl,
+ CompletableFuture<LongSupplier> maxClockSkewMillisFuture
+ ) {
this.storage = storage;
this.clusterTime = clusterTime;
+ this.idempotentCacheTtl = idempotentCacheTtl;
+ this.maxClockSkewMillisFuture = maxClockSkewMillisFuture;
}
/**
@@ -95,7 +112,14 @@ public class MetaStorageWriteHandler {
CommandClosure<WriteCommand> resultClosure;
if (command instanceof IdempotentCommand) {
- CommandId commandId = ((IdempotentCommand) command).id();
+ IdempotentCommand idempotentCommand = ((IdempotentCommand)
command);
+ CommandId commandId = idempotentCommand.id();
+
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-19417 Remove.
+ if (idempotentCommand.safeTime().getPhysical() % 100 == 0) {
+ evictIdempotentCommandsCache(((IdempotentCommand)
command).safeTime());
+ }
+
IdempotentCommandCachedResult cachedResult =
idempotentCommandCache.get(commandId);
if (cachedResult != null) {
@@ -352,6 +376,39 @@ public class MetaStorageWriteHandler {
}
}
+ /**
+ * Removes obsolete entries from both volatile and persistent idempotent
command cache.
+ *
+ * @param safeTime Trigger operation safe time. TODO:
https://issues.apache.org/jira/browse/IGNITE-19417 Remove.
+ */
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-19417 Call on meta
storage compaction.
+ void evictIdempotentCommandsCache(HybridTimestamp safeTime) {
+ HybridTimestamp cleanupTimestamp = clusterTime.now();
+ LOG.info("Idempotent command cache cleanup started
[cleanupTimestamp={}].", cleanupTimestamp);
+
+ maxClockSkewMillisFuture.thenAccept(maxClockSkewMillis -> {
+ List<CommandId> commandIdsToRemove =
idempotentCommandCache.entrySet().stream()
+ .filter(entry ->
entry.getValue().commandStartTime.getPhysical()
+ <= cleanupTimestamp.getPhysical() -
(idempotentCacheTtl.value() + maxClockSkewMillis.getAsLong()))
+ .map(entry -> entry.getKey())
+ .collect(toList());
+
+ if (!commandIdsToRemove.isEmpty()) {
+ List<byte[]> commandIdStorageKeys = commandIdsToRemove.stream()
+ .map(commandId -> ArrayUtils.concat(new byte[]{},
ByteUtils.toBytes(commandId)))
+ .collect(toList());
+
+ storage.removeAll(commandIdStorageKeys, safeTime);
+
+
commandIdsToRemove.forEach(idempotentCommandCache.keySet()::remove);
+ }
+
+ LOG.info("Idempotent command cache cleanup finished
[cleanupTimestamp={}, cleanupCompletionTimestamp={},"
+ + " removedEntriesCount={}, cacheSize={}].",
cleanupTimestamp, clusterTime.now(), commandIdsToRemove.size(),
+ idempotentCommandCache.size());
+ });
+ }
+
private static class IdempotentCommandCachedResult {
@Nullable
final Serializable result;
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/IdempotentCommandCacheTest.java
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/IdempotentCommandCacheTest.java
index 968e129d8c..c1cdd33bfd 100644
---
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/IdempotentCommandCacheTest.java
+++
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/IdempotentCommandCacheTest.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.metastorage.impl;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
import static org.apache.ignite.internal.metastorage.dsl.Operations.ops;
import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
@@ -32,6 +34,8 @@ import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
+import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.lang.ByteArray;
@@ -47,22 +51,27 @@ import
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStora
import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl;
import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.raft.service.CommandClosure;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
/**
* Tests for idempotency of {@link
org.apache.ignite.internal.metastorage.command.IdempotentCommand}.
*/
-public class IdempotentCommandCacheTest {
+@ExtendWith(ConfigurationExtension.class)
+public class IdempotentCommandCacheTest extends BaseIgniteAbstractTest {
private static final String NODE_NAME = "node";
private static final MetaStorageCommandsFactory CMD_FACTORY = new
MetaStorageCommandsFactory();
- private final KeyValueStorage storage;
+ private KeyValueStorage storage;
- private final MetaStorageListener metaStorageListener;
+ private MetaStorageListener metaStorageListener;
private final HybridClock clock = new HybridClockImpl();
@@ -71,14 +80,17 @@ public class IdempotentCommandCacheTest {
private final CommandIdGenerator commandIdGenerator = new
CommandIdGenerator(() -> UUID.randomUUID().toString());
- /**
- * Constructor.
- */
- public IdempotentCommandCacheTest() {
+ @InjectConfiguration
+ private RaftConfiguration raftConfiguration;
+
+ @BeforeEach
+ public void setUp() {
storage = new SimpleInMemoryKeyValueStorage(NODE_NAME);
metaStorageListener = new MetaStorageListener(
storage,
- new ClusterTimeImpl(NODE_NAME, new IgniteSpinBusyLock(), clock)
+ new ClusterTimeImpl(NODE_NAME, new IgniteSpinBusyLock(),
clock),
+ raftConfiguration.retryTimeout(),
+ completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
);
}
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java
index d6fc9c0431..c548f0d734 100644
---
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java
+++
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.metastorage.impl;
import static java.util.concurrent.CompletableFuture.completedFuture;
+import static
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -43,6 +44,7 @@ import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.raft.RaftManager;
import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
import
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
+import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
@@ -56,6 +58,9 @@ public class MetaStorageDeployWatchesCorrectnessTest extends
IgniteAbstractTest
@InjectConfiguration
private static MetaStorageConfiguration metaStorageConfiguration;
+ @InjectConfiguration
+ private static RaftConfiguration raftConfiguration;
+
/**
* Returns a stream with test arguments.
*
@@ -87,7 +92,9 @@ public class MetaStorageDeployWatchesCorrectnessTest extends
IgniteAbstractTest
clock,
mock(TopologyAwareRaftGroupServiceFactory.class),
new NoOpMetricManager(),
- metaStorageConfiguration
+ metaStorageConfiguration,
+ raftConfiguration.retryTimeout(),
+ completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
),
StandaloneMetaStorageManager.create()
);
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java
index a8b86c57b2..ac65131cac 100644
---
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java
+++
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.metastorage.impl;
import static java.util.concurrent.CompletableFuture.completedFuture;
+import static
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
@@ -48,6 +49,7 @@ import org.apache.ignite.internal.network.MessagingService;
import
org.apache.ignite.internal.network.serialization.MessageSerializationRegistry;
import org.apache.ignite.internal.raft.RaftManager;
import
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
+import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.network.NodeMetadata;
@@ -65,6 +67,9 @@ public class MetaStorageManagerRecoveryTest extends
BaseIgniteAbstractTest {
@InjectConfiguration
private static MetaStorageConfiguration metaStorageConfiguration;
+ @InjectConfiguration
+ private static RaftConfiguration raftConfiguration;
+
private MetaStorageManagerImpl metaStorageManager;
private KeyValueStorage kvs;
@@ -89,7 +94,9 @@ public class MetaStorageManagerRecoveryTest extends
BaseIgniteAbstractTest {
clock,
mock(TopologyAwareRaftGroupServiceFactory.class),
new NoOpMetricManager(),
- metaStorageConfiguration
+ metaStorageConfiguration,
+ raftConfiguration.retryTimeout(),
+ completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
);
}
diff --git
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
index c9b02c071a..1e70463ad3 100644
---
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
+++
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.metastorage.impl;
import static java.util.Collections.singleton;
import static java.util.concurrent.CompletableFuture.completedFuture;
+import static
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -28,6 +29,7 @@ import java.io.Serializable;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.function.LongSupplier;
import org.apache.ignite.configuration.ConfigurationValue;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
@@ -46,6 +48,7 @@ import org.apache.ignite.internal.raft.ReadCommand;
import org.apache.ignite.internal.raft.WriteCommand;
import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
import
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
+import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.raft.service.BeforeApplyHandler;
import org.apache.ignite.internal.raft.service.CommandClosure;
import org.apache.ignite.internal.raft.service.RaftGroupListener;
@@ -105,7 +108,9 @@ public class StandaloneMetaStorageManager extends
MetaStorageManagerImpl {
keyValueStorage,
mock(TopologyAwareRaftGroupServiceFactory.class),
mockConfiguration(),
- clock
+ clock,
+ mockRaftConfiguration().retryTimeout(),
+ completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
);
}
@@ -126,7 +131,9 @@ public class StandaloneMetaStorageManager extends
MetaStorageManagerImpl {
KeyValueStorage storage,
TopologyAwareRaftGroupServiceFactory raftServiceFactory,
MetaStorageConfiguration configuration,
- HybridClock clock
+ HybridClock clock,
+ ConfigurationValue<Long> idempotentCacheTtl,
+ CompletableFuture<LongSupplier> maxClockSkewMillisFuture
) {
super(
clusterService,
@@ -137,7 +144,9 @@ public class StandaloneMetaStorageManager extends
MetaStorageManagerImpl {
clock,
raftServiceFactory,
new NoOpMetricManager(),
- configuration
+ configuration,
+ idempotentCacheTtl,
+ maxClockSkewMillisFuture
);
}
@@ -211,6 +220,10 @@ public class StandaloneMetaStorageManager extends
MetaStorageManagerImpl {
return configuration;
}
+ private static RaftConfiguration mockRaftConfiguration() {
+ return mock(RaftConfiguration.class, LENIENT_SETTINGS);
+ }
+
private static CompletableFuture<Serializable> runCommand(Command command,
RaftGroupListener listener) {
CompletableFuture<Serializable> future = new CompletableFuture<>();
diff --git
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java
index ad403899f3..2282eb86f5 100644
---
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java
+++
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java
@@ -43,6 +43,7 @@ import java.util.stream.IntStream;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.TestClockService;
@@ -254,6 +255,8 @@ public class MultiActorPlacementDriverTest extends
BasePlacementDriverTest {
var storage = new SimpleInMemoryKeyValueStorage(nodeName);
+ ClockService clockService = new TestClockService(nodeClock);
+
var metaStorageManager = new MetaStorageManagerImpl(
clusterService,
cmgManager,
@@ -263,7 +266,9 @@ public class MultiActorPlacementDriverTest extends
BasePlacementDriverTest {
nodeClock,
topologyAwareRaftGroupServiceFactory,
new NoOpMetricManager(),
- metaStorageConfiguration
+ metaStorageConfiguration,
+ raftConfiguration.retryTimeout(),
+ completedFuture(clockService::maxClockSkewMillis)
);
if (this.metaStorageManager == null) {
@@ -279,7 +284,7 @@ public class MultiActorPlacementDriverTest extends
BasePlacementDriverTest {
logicalTopologyService,
raftManager,
topologyAwareRaftGroupServiceFactory,
- new TestClockService(nodeClock)
+ clockService
);
res.add(new Node(nodeName, clusterService, raftManager,
metaStorageManager, placementDriverManager));
diff --git
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
index b7cda99359..f586f89e0b 100644
---
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
+++
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
@@ -63,6 +63,7 @@ import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopolog
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -191,6 +192,8 @@ public class PlacementDriverManagerTest extends
BasePlacementDriverTest {
var storage = new SimpleInMemoryKeyValueStorage(nodeName);
+ ClockService clockService = new TestClockService(nodeClock);
+
metaStorageManager = new MetaStorageManagerImpl(
clusterService,
cmgManager,
@@ -200,7 +203,9 @@ public class PlacementDriverManagerTest extends
BasePlacementDriverTest {
nodeClock,
topologyAwareRaftGroupServiceFactory,
new NoOpMetricManager(),
- metaStorageConfiguration
+ metaStorageConfiguration,
+ raftConfiguration.retryTimeout(),
+ completedFuture(clockService::maxClockSkewMillis)
);
placementDriverManager = new PlacementDriverManager(
@@ -212,7 +217,7 @@ public class PlacementDriverManagerTest extends
BasePlacementDriverTest {
logicalTopologyService,
raftManager,
topologyAwareRaftGroupServiceFactory,
- new TestClockService(nodeClock)
+ clockService
);
ComponentContext componentContext = new ComponentContext();
diff --git
a/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/service/ItAbstractListenerSnapshotTest.java
b/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/service/ItAbstractListenerSnapshotTest.java
index 01e8aebd08..2913279d95 100644
---
a/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/service/ItAbstractListenerSnapshotTest.java
+++
b/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/service/ItAbstractListenerSnapshotTest.java
@@ -107,7 +107,7 @@ public abstract class ItAbstractListenerSnapshotTest<T
extends RaftGroupListener
private ScheduledExecutorService executor;
@InjectConfiguration
- private RaftConfiguration raftConfiguration;
+ protected RaftConfiguration raftConfiguration;
/**
* Create executor for raft group services.
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
index 4471754bff..ee6af33877 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
@@ -18,7 +18,9 @@
package org.apache.ignite.internal.configuration;
import static java.util.concurrent.CompletableFuture.allOf;
+import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.Collectors.toUnmodifiableList;
+import static
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
@@ -209,7 +211,9 @@ public class ItDistributedConfigurationPropertiesTest
extends BaseIgniteAbstract
clock,
topologyAwareRaftGroupServiceFactory,
new NoOpMetricManager(),
- metaStorageConfiguration
+ metaStorageConfiguration,
+ raftConfiguration.retryTimeout(),
+ completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
);
deployWatchesFut = metaStorageManager.deployWatches();
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
index e6cc4ba817..04cdc3d132 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.configuration.storage;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
@@ -181,7 +183,9 @@ public class ItDistributedConfigurationStorageTest extends
BaseIgniteAbstractTes
clock,
topologyAwareRaftGroupServiceFactory,
new NoOpMetricManager(),
- metaStorageConfiguration
+ metaStorageConfiguration,
+ raftConfiguration.retryTimeout(),
+ completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
);
deployWatchesFut = metaStorageManager.deployWatches();
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 80051692c6..e85b913e65 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -417,6 +417,8 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
InvokeInterceptor metaStorageInvokeInterceptor =
metaStorageInvokeInterceptorByNode.get(idx);
+ CompletableFuture<LongSupplier> maxClockSkewFuture = new
CompletableFuture<>();
+
var metaStorageMgr = new MetaStorageManagerImpl(
clusterSvc,
cmgManager,
@@ -426,7 +428,9 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
hybridClock,
topologyAwareRaftGroupServiceFactory,
metricManager,
- metaStorageConfiguration
+ metaStorageConfiguration,
+ raftConfiguration.retryTimeout(),
+ maxClockSkewFuture
) {
@Override
public CompletableFuture<Boolean> invoke(Condition condition,
Collection<Operation> success, Collection<Operation> failure) {
@@ -464,12 +468,15 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
SchemaSynchronizationConfiguration schemaSyncConfiguration =
clusterConfigRegistry.getConfiguration(
SchemaSynchronizationConfiguration.KEY
);
+
ClockService clockService = new ClockServiceImpl(
hybridClock,
clockWaiter,
() -> schemaSyncConfiguration.maxClockSkew().value()
);
+ maxClockSkewFuture.complete(clockService::maxClockSkewMillis);
+
var placementDriverManager = new PlacementDriverManager(
name,
metaStorageMgr,
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index e966a158e3..80b612e8c0 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -576,6 +576,8 @@ public class IgniteImpl implements Ignite {
raftGroupEventsClientListener
);
+ CompletableFuture<LongSupplier> maxClockSkewMillisFuture = new
CompletableFuture<>();
+
metaStorageMgr = new MetaStorageManagerImpl(
clusterSvc,
cmgMgr,
@@ -584,7 +586,9 @@ public class IgniteImpl implements Ignite {
new RocksDbKeyValueStorage(name,
workDir.resolve(METASTORAGE_DB_PATH), failureProcessor),
clock,
topologyAwareRaftGroupServiceFactory,
- metricManager
+ metricManager,
+ raftConfiguration.retryTimeout(),
+ maxClockSkewMillisFuture
);
this.cfgStorage = new DistributedConfigurationStorage(name,
metaStorageMgr);
@@ -606,6 +610,8 @@ public class IgniteImpl implements Ignite {
clockService = new ClockServiceImpl(clock, clockWaiter, new
SameValueLongSupplier(() -> schemaSyncConfig.maxClockSkew().value()));
+ maxClockSkewMillisFuture.complete(clockService::maxClockSkewMillis);
+
Consumer<LongFunction<CompletableFuture<?>>> registry = c ->
metaStorageMgr.registerRevisionUpdateListener(c::apply);
placementDriverMgr = new PlacementDriverManager(
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index 9306da8ddd..71afc3cbad 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.rebalance;
import static java.util.Collections.reverse;
import static java.util.concurrent.CompletableFuture.allOf;
+import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.toSet;
@@ -33,6 +34,7 @@ import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUt
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.plannedPartAssignmentsKey;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static
org.apache.ignite.internal.testframework.TestIgnitionManager.DEFAULT_MAX_CLOCK_SKEW_MS;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
@@ -1101,7 +1103,9 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
hybridClock,
topologyAwareRaftGroupServiceFactory,
metricManager,
- metaStorageConfiguration
+ metaStorageConfiguration,
+ raftConfiguration.retryTimeout(),
+ completedFuture(() -> DEFAULT_MAX_CLOCK_SKEW_MS)
);
var placementDriver = new TestPlacementDriver(() ->
PRIMARY_FILTER.apply(clusterService.topologyService().allMembers()));
@@ -1125,7 +1129,7 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
ClockService clockService = new ClockServiceImpl(
hybridClock,
clockWaiter,
- () -> TestIgnitionManager.DEFAULT_MAX_CLOCK_SKEW_MS
+ () -> DEFAULT_MAX_CLOCK_SKEW_MS
);
TransactionInflights transactionInflights = new
TransactionInflights(placementDriver, clockService);