This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new eeac526827 IGNITE-22819 Fix Metastorage revisions inconsistency (#4143)
eeac526827 is described below
commit eeac5268276251dcaadc04d23c54821a6c4d9806
Author: Alexander Lapin <[email protected]>
AuthorDate: Fri Aug 2 15:02:58 2024 +0300
IGNITE-22819 Fix Metastorage revisions inconsistency (#4143)
---
.../org/apache/ignite/internal/util/ByteUtils.java | 20 ++
.../DistributionZoneRebalanceEngineTest.java | 13 +-
.../RebalanceUtilUpdateAssignmentsTest.java | 14 +-
modules/jacoco-report/build.gradle | 1 +
modules/metastorage-cache/README.md | 10 +
modules/metastorage-cache/build.gradle | 33 +++
.../cache/IdempotentCacheVacuumizer.java | 165 +++++++++++++++
.../cache/IdempotentCacheVacuumizerTest.java | 225 +++++++++++++++++++++
.../impl/ItIdempotentCommandCacheTest.java | 11 +-
.../impl/ItMetaStorageManagerImplTest.java | 9 +-
.../ItMetaStorageMultipleNodesAbstractTest.java | 6 +-
.../impl/ItMetaStorageServicePersistenceTest.java | 9 +-
.../metastorage/impl/ItMetaStorageServiceTest.java | 12 +-
.../metastorage/impl/ItMetaStorageWatchTest.java | 6 +-
.../server/raft/ItMetaStorageRaftGroupTest.java | 25 +--
.../EvictIdempotentCommandsCacheCommand.java | 30 +++
.../command/MetastorageCommandsMessageGroup.java | 3 +
.../impl/MetaStorageLeaderElectionListener.java | 11 +-
.../metastorage/impl/MetaStorageManagerImpl.java | 70 ++-----
.../metastorage/impl/MetaStorageServiceImpl.java | 32 +++
.../metastorage/server/KeyValueStorage.java | 8 +
.../server/persistence/RocksDbKeyValueStorage.java | 39 ++--
.../server/raft/MetaStorageListener.java | 23 +--
.../server/raft/MetaStorageWriteHandler.java | 93 ++++-----
.../impl/IdempotentCommandCacheTest.java | 14 +-
.../MetaStorageDeployWatchesCorrectnessTest.java | 9 +-
.../impl/MetaStorageManagerRecoveryTest.java | 9 +-
.../server/BasicOperationsKeyValueStorageTest.java | 72 ++++++-
.../impl/StandaloneMetaStorageManager.java | 14 +-
.../server/SimpleInMemoryKeyValueStorage.java | 21 +-
.../replicator/ItReplicaLifecycleTest.java | 8 +-
.../MultiActorPlacementDriverTest.java | 4 +-
.../PlacementDriverManagerTest.java | 4 +-
.../service/ItAbstractListenerSnapshotTest.java | 2 +-
modules/runner/build.gradle | 2 +
.../ItDistributedConfigurationPropertiesTest.java | 6 +-
.../ItDistributedConfigurationStorageTest.java | 6 +-
.../runner/app/ItIgniteNodeRestartTest.java | 4 +-
.../org/apache/ignite/internal/app/IgniteImpl.java | 26 ++-
.../rebalance/ItRebalanceDistributedTest.java | 5 +-
settings.gradle | 2 +
41 files changed, 756 insertions(+), 320 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/ByteUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/ByteUtils.java
index 8efda24489..86563608ed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/ByteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ByteUtils.java
@@ -268,6 +268,26 @@ public class ByteUtils {
}
}
+ /**
+ * Deserializes an object from byte array using native java serialization
mechanism.
+ *
+ * @param bytes Byte array.
+ * @param from – the offset in the buffer of the first byte to read.
+ * @param length – the maximum number of bytes to read from the buffer.
+ * @return Object.
+ */
+ // TODO https://issues.apache.org/jira/browse/IGNITE-22894 Extend test
coverage.
+ public static <T> T fromBytes(byte[] bytes, int from, int length) {
+ try (
+ var bis = new ByteArrayInputStream(bytes, from, length);
+ var in = new ObjectInputStream(bis)
+ ) {
+ return (T) in.readObject();
+ } catch (IOException | ClassNotFoundException e) {
+ throw new IgniteInternalException("Could not deserialize an
object", e);
+ }
+ }
+
/**
* Converts a string to a byte array using {@link StandardCharsets#UTF_8},
{@code null} if {@code s} is {@code null}.
*
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
index b08c23f7d6..baf31c1b7f 100644
---
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
@@ -30,7 +30,6 @@ import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartAssignmentsKey;
import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
-import static
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
import static org.apache.ignite.internal.table.TableTestUtils.getTableIdStrict;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
@@ -70,7 +69,6 @@ import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.commands.ColumnParams;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
-import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
import org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil;
import org.apache.ignite.internal.distributionzones.Node;
@@ -99,7 +97,6 @@ import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.raft.Command;
import org.apache.ignite.internal.raft.WriteCommand;
-import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.raft.service.CommandClosure;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.replicator.TablePartitionId;
@@ -140,9 +137,6 @@ public class DistributionZoneRebalanceEngineTest extends
IgniteAbstractTest {
private CatalogManager catalogManager;
- @InjectConfiguration
- private RaftConfiguration raftConfiguration;
-
@BeforeEach
public void setUp() {
String nodeName = "test";
@@ -184,12 +178,7 @@ public class DistributionZoneRebalanceEngineTest extends
IgniteAbstractTest {
ClusterTimeImpl clusterTime = new ClusterTimeImpl("node", new
IgniteSpinBusyLock(), clock);
- MetaStorageListener metaStorageListener = new MetaStorageListener(
- keyValueStorage,
- clusterTime,
- raftConfiguration.retryTimeout(),
- completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
- );
+ MetaStorageListener metaStorageListener = new
MetaStorageListener(keyValueStorage, clusterTime);
RaftGroupService metaStorageService = mock(RaftGroupService.class);
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java
index e87547b909..0fd9dc6979 100644
---
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java
@@ -17,13 +17,11 @@
package org.apache.ignite.internal.distributionzones.rebalance;
-import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.Collectors.toSet;
import static
org.apache.ignite.internal.affinity.AffinityUtils.calculateAssignmentForPartition;
import static org.apache.ignite.internal.affinity.Assignments.toBytes;
import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
-import static
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -46,7 +44,6 @@ import org.apache.ignite.internal.affinity.Assignments;
import
org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
-import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -66,7 +63,6 @@ import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.raft.Command;
import org.apache.ignite.internal.raft.WriteCommand;
-import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.raft.service.CommandClosure;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.replicator.TablePartitionId;
@@ -110,9 +106,6 @@ public class RebalanceUtilUpdateAssignmentsTest extends
IgniteAbstractTest {
private final HybridClock clock = new HybridClockImpl();
- @InjectConfiguration
- private RaftConfiguration raftConfiguration;
-
private static final int partNum = 2;
private static final int replicas = 2;
@@ -138,12 +131,7 @@ public class RebalanceUtilUpdateAssignmentsTest extends
IgniteAbstractTest {
ClusterTimeImpl clusterTime = new ClusterTimeImpl("node", new
IgniteSpinBusyLock(), clock);
- MetaStorageListener metaStorageListener = new MetaStorageListener(
- keyValueStorage,
- clusterTime,
- raftConfiguration.retryTimeout(),
- completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
- );
+ MetaStorageListener metaStorageListener = new
MetaStorageListener(keyValueStorage, clusterTime);
RaftGroupService metaStorageService = mock(RaftGroupService.class);
diff --git a/modules/jacoco-report/build.gradle
b/modules/jacoco-report/build.gradle
index 5260664c55..4ff0312858 100644
--- a/modules/jacoco-report/build.gradle
+++ b/modules/jacoco-report/build.gradle
@@ -31,6 +31,7 @@ dependencies {
implementation project(':ignite-core')
implementation project(':ignite-metastorage-api')
implementation project(':ignite-metastorage')
+ implementation project(':ignite-metastorage-cache')
implementation project(':ignite-network')
implementation project(':ignite-network-api')
implementation project(':ignite-raft')
diff --git a/modules/metastorage-cache/README.md
b/modules/metastorage-cache/README.md
new file mode 100644
index 0000000000..0983a81056
--- /dev/null
+++ b/modules/metastorage-cache/README.md
@@ -0,0 +1,10 @@
+# Metastorage idempotent command cache eviction module
+
+Module responsible for metastorage idempotent command cache eviction.
+
+Metastorage idempotent command cache is a mapping of commandId -> command
evaluation result that is used to store the results of
+the invoke and multi-invoke commands in order not to re-evaluate invoke
condition in case of operation retry. By the definition it's
+necessary to store such results for the command-processing-timeout + max clock
skew, thus after given interval corresponding cached command
+may re evicted. IdempotentCacheVacuumizer is an actor to trigger such
evictions. It would be reasonable to put it inside metastorage module
+itself instead of creating new one, however it's not possible because of
cyclic dependency. IdempotentCacheVacuumizer requires maxClockSkew
+that is stored in distributed configuration, that on it's turn requires
metastorage. That's why the new module was introduced.
\ No newline at end of file
diff --git a/modules/metastorage-cache/build.gradle
b/modules/metastorage-cache/build.gradle
new file mode 100644
index 0000000000..a4ac189796
--- /dev/null
+++ b/modules/metastorage-cache/build.gradle
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+apply from: "$rootDir/buildscripts/java-core.gradle"
+apply from: "$rootDir/buildscripts/publishing.gradle"
+apply from: "$rootDir/buildscripts/java-junit5.gradle"
+
+description = 'ignite-metastorage-cache'
+
+dependencies {
+ implementation project(':ignite-core')
+ implementation project(':ignite-metastorage')
+ implementation libs.jetbrains.annotations
+
+ testImplementation testFixtures(project(":ignite-core"))
+ testImplementation libs.mockito.junit
+ testImplementation libs.mockito.core
+ testImplementation libs.hamcrest.core
+}
diff --git
a/modules/metastorage-cache/src/main/java/org/apache/ignite/internal/metastorage/cache/IdempotentCacheVacuumizer.java
b/modules/metastorage-cache/src/main/java/org/apache/ignite/internal/metastorage/cache/IdempotentCacheVacuumizer.java
new file mode 100644
index 0000000000..bad5415364
--- /dev/null
+++
b/modules/metastorage-cache/src/main/java/org/apache/ignite/internal/metastorage/cache/IdempotentCacheVacuumizer.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.cache;
+
+import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
+import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import org.apache.ignite.configuration.ConfigurationValue;
+import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.metastorage.impl.ElectionListener;
+import org.apache.ignite.network.ClusterNode;
+
+/**
+ * Scheduler wrapper that triggers idempotent cache vacuumization with an
ability to suspend and resume the triggering. It is valid but not
+ * effective to have multiple vacuumizers at the same time, meaning that
best-effort uniqueness is preferable. In order to achieve such
+ * best-effort uniqueness it's possible to use meta storage leader
collocation: start/resume triggering on leader election if the leader is
+ * collocated with a local node, and suspend upon loss of collocation with the
leader.
+ * In case of exception within vacuumization action, vacuumizer will just log
a warning without suspending the scheduler.
+ */
+public class IdempotentCacheVacuumizer implements IgniteComponent,
ElectionListener {
+ private static final IgniteLogger LOG =
Loggers.forClass(IdempotentCacheVacuumizer.class);
+
+ private final AtomicBoolean triggerVacuumization;
+
+ private final String nodeName;
+
+ /** Scheduler to run vacuumization actions. */
+ private final ScheduledExecutorService scheduler;
+
+ /** Action that will trigger vacuumization process. */
+ private final Consumer<HybridTimestamp> vacuumizationAction;
+
+ /** Idempotent cache ttl. */
+ private final ConfigurationValue<Long> idempotentCacheTtl;
+
+ /** Clock service. */
+ private final ClockService clockService;
+
+ /** The time to delay first execution. */
+ private final long initialDelay;
+
+ /** The delay between the termination of one execution and the
commencement of the next. */
+ private final long delay;
+
+ /** The time unit of the initialDelay and delay parameters. */
+ private final TimeUnit unit;
+
+ /** Vacuumization task future. */
+ private volatile ScheduledFuture<?> scheduledFuture;
+
+ /**
+ * The constructor.
+ *
+ * @param nodeName Node name.
+ * @param scheduler Scheduler to run vacuumization actions.
+ * @param vacuumizationAction Action that will trigger vacuumization
process.
+ * @param idempotentCacheTtl Idempotent cache ttl.
+ * @param clockService Clock service.
+ * @param initialDelay The time to delay first execution.
+ * @param delay The delay between the termination of one execution and the
commencement of the next.
+ * @param unit The time unit of the initialDelay and delay parameters.
+ */
+ public IdempotentCacheVacuumizer(
+ String nodeName,
+ ScheduledExecutorService scheduler,
+ Consumer<HybridTimestamp> vacuumizationAction,
+ ConfigurationValue<Long> idempotentCacheTtl,
+ ClockService clockService,
+ long initialDelay,
+ long delay,
+ TimeUnit unit
+ ) {
+ this.nodeName = nodeName;
+ this.triggerVacuumization = new AtomicBoolean(false);
+ this.scheduler = scheduler;
+ this.vacuumizationAction = vacuumizationAction;
+ this.idempotentCacheTtl = idempotentCacheTtl;
+ this.clockService = clockService;
+ this.initialDelay = initialDelay;
+ this.delay = delay;
+ this.unit = unit;
+ }
+
+ @Override
+ public CompletableFuture<Void> startAsync(ComponentContext
componentContext) {
+ scheduledFuture = scheduler.scheduleWithFixedDelay(
+ () -> {
+ if (triggerVacuumization.get()) {
+ try {
+
vacuumizationAction.accept(hybridTimestamp(clockService.nowLong()
+ - (idempotentCacheTtl.value() +
clockService.maxClockSkewMillis())));
+ } catch (Exception e) {
+ LOG.warn("An exception occurred while executing
idempotent cache vacuumization action."
+ + " Idempotent cache vacuumizer won't be
stopped.", e);
+ }
+ }
+ },
+ initialDelay,
+ delay,
+ unit
+ );
+
+ return nullCompletedFuture();
+ }
+
+ @Override
+ public CompletableFuture<Void> stopAsync(ComponentContext
componentContext) {
+ if (scheduledFuture != null) {
+ scheduledFuture.cancel(false);
+ }
+
+ return nullCompletedFuture();
+ }
+
+ @Override
+ public void onLeaderElected(ClusterNode newLeader) {
+ if (newLeader.name().equals(nodeName)) {
+ startLocalVacuumizationTriggering();
+ } else {
+ suspendLocalVacuumizationTriggering();
+ }
+ }
+
+ /**
+ * Starts local vacuumization triggering. Will take no effect if
vacuumizer was previously stopped.
+ */
+ void startLocalVacuumizationTriggering() {
+ triggerVacuumization.set(true);
+ LOG.info("Idempotent cache vacuumizer started.");
+ }
+
+ /**
+ * Suspends further local vacuumization triggering. Will take no effect if
vacuumizer was previously stopped.
+ */
+ void suspendLocalVacuumizationTriggering() {
+ triggerVacuumization.set(false);
+ LOG.info("Idempotent cache vacuumizer suspended.");
+ }
+}
diff --git
a/modules/metastorage-cache/src/test/java/org/apache/ignite/internal/metastorage/cache/IdempotentCacheVacuumizerTest.java
b/modules/metastorage-cache/src/test/java/org/apache/ignite/internal/metastorage/cache/IdempotentCacheVacuumizerTest.java
new file mode 100644
index 0000000000..31c29ef938
--- /dev/null
+++
b/modules/metastorage-cache/src/test/java/org/apache/ignite/internal/metastorage/cache/IdempotentCacheVacuumizerTest.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.cache;
+
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.withSettings;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import org.apache.ignite.configuration.ConfigurationValue;
+import org.apache.ignite.internal.TestHybridClock;
+import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.hlc.TestClockService;
+import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockSettings;
+import org.mockito.quality.Strictness;
+
+/**
+ * Tests for idempotency of {@link IdempotentCacheVacuumizer}.
+ */
+public class IdempotentCacheVacuumizerTest extends BaseIgniteAbstractTest {
+ private static final int TOUCH_COUNTER_CHANGE_TIMEOUT_MILLIS = 1_000;
+
+ private static final MockSettings LENIENT_SETTINGS =
withSettings().strictness(Strictness.LENIENT);
+
+ private ScheduledExecutorService scheduler;
+
+ private ClockService clocService;
+
+ private ConfigurationValue<Long> idempotentCacheTtlConfigurationValue;
+
+ private IdempotentCacheVacuumizer vacuumizer;
+
+ @BeforeEach
+ public void setup() {
+ scheduler = Executors.newSingleThreadScheduledExecutor();
+ clocService = new TestClockService(new TestHybridClock(() -> 1L));
+
+ idempotentCacheTtlConfigurationValue = mock(ConfigurationValue.class,
LENIENT_SETTINGS);
+ when(idempotentCacheTtlConfigurationValue.value()).thenReturn(0L);
+ }
+
+ @AfterEach
+ public void tearDown() {
+ if (vacuumizer != null) {
+ vacuumizer.beforeNodeStop();
+ assertThat(vacuumizer.stopAsync(), willCompleteSuccessfully());
+ }
+
+ scheduler.shutdown();
+ }
+
+ /**
+ * Check that IdempotentCacheVacuumizer triggers vacuumization action.
+ * <ol>
+ * <li>Ensure that until starting, vacuumizer will not trigger the
vacuumization action.</li>
+ * <li>Start vacuumization triggering and verify that vacuumization
action was called.</li>
+ * <li>Suspend vacuumization triggering and verify that vacuumization
action calls were suspended.</li>
+ * <li>Start vacuumization triggering and verify that vacuumization
action was called.</li>
+ * </ol>
+ *
+ * @throws Exception if Thread.sleep() was interrupted.
+ */
+ @Test
+ public void testIdempotentCacheVacuumizer() throws Exception {
+ AtomicInteger touchCounter = new AtomicInteger(0);
+
+ IdempotentCacheVacuumizer vacuumizer = new IdempotentCacheVacuumizer(
+ "Node1",
+ scheduler,
+ ignored -> touchCounter.incrementAndGet(),
+ idempotentCacheTtlConfigurationValue,
+ clocService,
+ 0,
+ 1,
+ TimeUnit.MILLISECONDS
+ );
+
+ assertThat(vacuumizer.startAsync(new ComponentContext()),
willCompleteSuccessfully());
+
+ // Ensure that until starting, vacuumizer will not trigger the
vacuumization action. It's a best-effort check.
+ Thread.sleep(10);
+ assertEquals(0, touchCounter.get());
+
+ // Start vacuumization triggering and verify that vacuumization action
was called.
+ vacuumizer.startLocalVacuumizationTriggering();
+ assertTrue(waitForCondition(
+ () -> touchCounter.get() > 0,
+ TOUCH_COUNTER_CHANGE_TIMEOUT_MILLIS)
+ );
+
+ // Suspend vacuumization triggering and verify that vacuumization
action calls were suspended.
+ vacuumizer.suspendLocalVacuumizationTriggering();
+ int touchCounterAfterStopTriggered = touchCounter.get();
+ assertTrue(waitForCondition(
+ () -> touchCounter.get() == touchCounterAfterStopTriggered ||
touchCounter.get() == touchCounterAfterStopTriggered + 1,
+ TOUCH_COUNTER_CHANGE_TIMEOUT_MILLIS)
+ );
+
+ // Start vacuumization triggering and verify that vacuumization action
was called.
+ vacuumizer.startLocalVacuumizationTriggering();
+ assertTrue(waitForCondition(
+ () -> touchCounter.get() > touchCounterAfterStopTriggered + 1,
+ TOUCH_COUNTER_CHANGE_TIMEOUT_MILLIS)
+ );
+ }
+
+ /**
+ * Check that IdempotentCacheVacuumizer doesn't trigger vacuumization
action after shutdown.
+ * <ol>
+ * <li>Start vacuumization triggering and verify that vacuumization
action was called.</li>
+ * <li>Shutdown the vacuumizer scheduler and check that action calls
were stopped.</li>
+ * <li>Start the vacuumizer and check that it doesn't take any
effect.</li>
+ * <li>Suspend vacuumization triggering and check that it doesn't take
any effect.</li>
+ * </ol>
+ *
+ * @throws Exception if Thread.sleep() was interrupted.
+ */
+ @Test
+ public void testIdempotentCacheVacuumizerAfterShutdown() throws Exception {
+ AtomicInteger touchCounter = new AtomicInteger(0);
+
+ IdempotentCacheVacuumizer vacuumizer = new IdempotentCacheVacuumizer(
+ "Node1",
+ scheduler,
+ ignored -> touchCounter.incrementAndGet(),
+ idempotentCacheTtlConfigurationValue,
+ clocService,
+ 0,
+ 1,
+ TimeUnit.MILLISECONDS
+ );
+
+ assertThat(vacuumizer.startAsync(new ComponentContext()),
willCompleteSuccessfully());
+
+ // Start vacuumization triggering and verify that vacuumization action
was called.
+ vacuumizer.startLocalVacuumizationTriggering();
+ assertTrue(waitForCondition(
+ () -> touchCounter.get() > 0,
+ TOUCH_COUNTER_CHANGE_TIMEOUT_MILLIS)
+ );
+
+ // Shutdown the vacuumizer scheduler and check that action calls were
stopped.
+ scheduler.shutdown();
+ int touchCounterAfterShutdown = touchCounter.get();
+ Thread.sleep(10);
+ assertTrue(touchCounter.get() == touchCounterAfterShutdown ||
touchCounter.get() == touchCounterAfterShutdown + 1);
+
+ // Start the vacuumizer and check that it doesn't take any effect.
+ vacuumizer.startLocalVacuumizationTriggering();
+ Thread.sleep(10);
+ assertTrue(touchCounter.get() == touchCounterAfterShutdown ||
touchCounter.get() == touchCounterAfterShutdown + 1);
+
+ // Suspend vacuumization triggering and check that it doesn't take any
effect.
+ vacuumizer.suspendLocalVacuumizationTriggering();
+ Thread.sleep(10);
+ assertTrue(touchCounter.get() == touchCounterAfterShutdown ||
touchCounter.get() == touchCounterAfterShutdown + 1);
+ }
+
+ /**
+ * Check that IdempotentCacheVacuumizer doesn't stops on exception in
vacuumization action doesn't re-throw it to the outer environment
+ * but logs an exception with WARN.
+ *
+ * @throws Exception if Thread.sleep() was interrupted.
+ */
+ @Test
+ public void testIdempotentCacheExceptionHandling() throws Exception {
+ AtomicInteger touchCounter = new AtomicInteger(0);
+
+ Consumer<HybridTimestamp> vacuumizationActionStub = ignored -> {
+ touchCounter.incrementAndGet();
+ throw new IllegalStateException();
+ };
+
+ IdempotentCacheVacuumizer vacuumizer = new IdempotentCacheVacuumizer(
+ "Node1",
+ scheduler,
+ vacuumizationActionStub,
+ idempotentCacheTtlConfigurationValue,
+ clocService,
+ 0,
+ 1,
+ TimeUnit.MILLISECONDS
+ );
+
+ assertThat(vacuumizer.startAsync(new ComponentContext()),
willCompleteSuccessfully());
+
+ // Start vacuumization triggering and verify that vacuumization
actions were not stopped after exception.
+ vacuumizer.startLocalVacuumizationTriggering();
+
+ assertTrue(waitForCondition(
+ () -> touchCounter.get() > 1,
+ TOUCH_COUNTER_CHANGE_TIMEOUT_MILLIS)
+ );
+ }
+}
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
index da3cf1b836..7c5b0d18bb 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.metastorage.impl;
-import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.Collectors.toSet;
import static
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
@@ -196,9 +195,7 @@ public class ItIdempotentCommandCacheTest extends
IgniteAbstractTest {
clock,
topologyAwareRaftGroupServiceFactory,
new NoOpMetricManager(),
- metaStorageConfiguration,
- raftConfiguration.retryTimeout(),
- completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
+ metaStorageConfiguration
);
clockWaiter = new ClockWaiter(clusterService.nodeName(), clock);
@@ -415,10 +412,10 @@ public class ItIdempotentCommandCacheTest extends
IgniteAbstractTest {
), willCompleteSuccessfully());
}
- for (Node node : nodes) {
- node.metaStorageManager.evictIdempotentCommandsCache();
- }
+ HybridTimestamp evictionTimestamp =
HybridTimestamp.hybridTimestamp(nodes.get(0).clockService.nowLong()
+ - (raftConfiguration.retryTimeout().value() +
nodes.get(0).clockService.maxClockSkewMillis()));
+
assertThat(nodes.get(0).metaStorageManager.evictIdempotentCommandsCache(evictionTimestamp),
willCompleteSuccessfully());
// Run same idempotent command one more time and check that condition
**was** re-evaluated and not retrieved from the cache.
CompletableFuture<Object> commandProcessingResultFuture3 =
raftClient().run(idempotentCommand);
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
index 8275484bf8..3c8a05542c 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.metastorage.impl;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.Collectors.toList;
-import static
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
import static
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.clusterService;
import static
org.apache.ignite.internal.testframework.flow.TestFlowUtils.subscribeToList;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast;
@@ -146,9 +145,7 @@ public class ItMetaStorageManagerImplTest extends
IgniteAbstractTest {
clock,
topologyAwareRaftGroupServiceFactory,
new NoOpMetricManager(),
- metaStorageConfiguration,
- raftConfiguration.retryTimeout(),
- completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
+ metaStorageConfiguration
);
assertThat(
@@ -235,9 +232,7 @@ public class ItMetaStorageManagerImplTest extends
IgniteAbstractTest {
storage,
new HybridClockImpl(),
mock(TopologyAwareRaftGroupServiceFactory.class),
- new NoOpMetricManager(),
- raftConfiguration.retryTimeout(),
- completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
+ new NoOpMetricManager()
);
assertThat(metaStorageManager.stopAsync(new ComponentContext()),
willCompleteSuccessfully());
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
index d6b4941299..dd5d620b4b 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
@@ -18,9 +18,7 @@
package org.apache.ignite.internal.metastorage.impl;
import static java.util.concurrent.CompletableFuture.allOf;
-import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.Collectors.toSet;
-import static
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.revision;
import static org.apache.ignite.internal.metastorage.dsl.Operations.noop;
@@ -205,9 +203,7 @@ public abstract class
ItMetaStorageMultipleNodesAbstractTest extends IgniteAbstr
clock,
topologyAwareRaftGroupServiceFactory,
new NoOpMetricManager(),
- metaStorageConfiguration,
- raftConfiguration.retryTimeout(),
- completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
+ metaStorageConfiguration
);
deployWatchesFut = metaStorageManager.deployWatches();
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java
index 366a103993..ccf4eaffd0 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java
@@ -17,8 +17,6 @@
package org.apache.ignite.internal.metastorage.impl;
-import static java.util.concurrent.CompletableFuture.completedFuture;
-import static
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.nio.charset.StandardCharsets;
@@ -160,12 +158,7 @@ public class ItMetaStorageServicePersistenceTest extends
ItAbstractListenerSnaps
return s;
});
- return new MetaStorageListener(
- storage,
- new ClusterTimeImpl(nodeName, new IgniteSpinBusyLock(), new
HybridClockImpl()),
- raftConfiguration.retryTimeout(),
- completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
- );
+ return new MetaStorageListener(storage, new ClusterTimeImpl(nodeName,
new IgniteSpinBusyLock(), new HybridClockImpl()));
}
/** {@inheritDoc} */
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java
index 3ab7ebf264..559e617ee7 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java
@@ -17,10 +17,8 @@
package org.apache.ignite.internal.metastorage.impl;
-import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toUnmodifiableSet;
-import static
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.and;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.or;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.revision;
@@ -189,11 +187,8 @@ public class ItMetaStorageServiceTest extends
BaseIgniteAbstractTest {
private MetaStorageService metaStorageService;
- private RaftConfiguration raftConfiguration;
-
Node(ClusterService clusterService, RaftConfiguration
raftConfiguration, Path dataPath) {
this.clusterService = clusterService;
- this.raftConfiguration = raftConfiguration;
HybridClock clock = new HybridClockImpl();
@@ -239,12 +234,7 @@ public class ItMetaStorageServiceTest extends
BaseIgniteAbstractTest {
assert peer != null;
- var listener = new MetaStorageListener(
- mockStorage,
- clusterTime,
- raftConfiguration.retryTimeout(),
- completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
- );
+ var listener = new MetaStorageListener(mockStorage, clusterTime);
var raftNodeId = new RaftNodeId(MetastorageGroupId.INSTANCE, peer);
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
index 8bcaf65ecb..05b7c8f1b5 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
@@ -17,10 +17,8 @@
package org.apache.ignite.internal.metastorage.impl;
-import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
-import static
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
import static
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.findLocalAddresses;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
@@ -195,9 +193,7 @@ public class ItMetaStorageWatchTest extends
IgniteAbstractTest {
clock,
topologyAwareRaftGroupServiceFactory,
new NoOpMetricManager(),
- metaStorageConfiguration,
- raftConfiguration.retryTimeout(),
- completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
+ metaStorageConfiguration
);
components.add(metaStorageManager);
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java
index 50894e927e..e1ce03451d 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java
@@ -17,10 +17,8 @@
package org.apache.ignite.internal.metastorage.server.raft;
-import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.Collectors.collectingAndThen;
import static java.util.stream.Collectors.toSet;
-import static
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
import static
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.findLocalAddresses;
import static
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.waitForTopology;
import static org.apache.ignite.internal.raft.server.RaftGroupOptions.defaults;
@@ -83,6 +81,7 @@ import org.apache.ignite.raft.jraft.option.NodeOptions;
import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -221,6 +220,7 @@ public class ItMetaStorageRaftGroupTest extends
IgniteAbstractTest {
* @throws Exception If failed.
*/
@Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-22891")
public void testRangeNextWorksCorrectlyAfterLeaderChange() throws
Exception {
AtomicInteger replicatorStartedCounter = new AtomicInteger(0);
@@ -411,12 +411,7 @@ public class ItMetaStorageRaftGroupTest extends
IgniteAbstractTest {
metaStorageRaftSrv1.startRaftNode(
raftNodeId1,
membersConfiguration,
- new MetaStorageListener(
- mockStorage,
- mock(ClusterTimeImpl.class),
- raftConfiguration.retryTimeout(),
- completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
- ),
+ new MetaStorageListener(mockStorage,
mock(ClusterTimeImpl.class)),
defaults()
);
@@ -425,12 +420,7 @@ public class ItMetaStorageRaftGroupTest extends
IgniteAbstractTest {
metaStorageRaftSrv2.startRaftNode(
raftNodeId2,
membersConfiguration,
- new MetaStorageListener(
- mockStorage,
- mock(ClusterTimeImpl.class),
- raftConfiguration.retryTimeout(),
- completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
- ),
+ new MetaStorageListener(mockStorage,
mock(ClusterTimeImpl.class)),
defaults()
);
@@ -439,12 +429,7 @@ public class ItMetaStorageRaftGroupTest extends
IgniteAbstractTest {
metaStorageRaftSrv3.startRaftNode(
raftNodeId3,
membersConfiguration,
- new MetaStorageListener(
- mockStorage,
- mock(ClusterTimeImpl.class),
- raftConfiguration.retryTimeout(),
- completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
- ),
+ new MetaStorageListener(mockStorage,
mock(ClusterTimeImpl.class)),
defaults()
);
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/EvictIdempotentCommandsCacheCommand.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/EvictIdempotentCommandsCacheCommand.java
new file mode 100644
index 0000000000..21d6bfbacd
--- /dev/null
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/EvictIdempotentCommandsCacheCommand.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.command;
+
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.network.annotations.Transferable;
+
+/**
+ * Removes obsolete entries from both volatile and persistent idempotent
command cache.
+ */
+@Transferable(MetastorageCommandsMessageGroup.EVICT_IDEMPOTENT_COMMAND_CACHE)
+public interface EvictIdempotentCommandsCacheCommand extends
MetaStorageWriteCommand {
+ /** Cached entries older than given timestamp will be evicted. */
+ HybridTimestamp evictionTimestamp();
+}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetastorageCommandsMessageGroup.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetastorageCommandsMessageGroup.java
index 0967b84d5c..819d02717d 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetastorageCommandsMessageGroup.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetastorageCommandsMessageGroup.java
@@ -61,4 +61,7 @@ public interface MetastorageCommandsMessageGroup {
/** Message type for {@link SyncTimeCommand}. */
short SYNC_TIME = 70;
+
+ /** Message type for {@link EvictIdempotentCommandsCacheCommand}. */
+ short EVICT_IDEMPOTENT_COMMAND_CACHE = 71;
}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageLeaderElectionListener.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageLeaderElectionListener.java
index 016f6c0038..4a493c228f 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageLeaderElectionListener.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageLeaderElectionListener.java
@@ -118,11 +118,12 @@ public class MetaStorageLeaderElectionListener implements
LeaderElectionListener
logicalTopologyService.addEventListener(logicalTopologyEventListener);
metaStorageSvcFut
- .thenAcceptBoth(metaStorageConfigurationFuture,
(service, metaStorageConfiguration) ->
- clusterTime.startSafeTimeScheduler(
- safeTime -> service.syncTime(safeTime,
term),
- metaStorageConfiguration
- ))
+ .thenAcceptBoth(metaStorageConfigurationFuture,
(service, metaStorageConfiguration) -> {
+ clusterTime.startSafeTimeScheduler(
+ safeTime -> service.syncTime(safeTime,
term),
+ metaStorageConfiguration
+ );
+ })
.whenComplete((v, e) -> {
if (e != null) {
LOG.error("Unable to start Idle Safe Time
scheduler", e);
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
index 4a388c45fa..30ac53b403 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.metastorage.impl;
import static java.util.concurrent.CompletableFuture.failedFuture;
-import static java.util.concurrent.TimeUnit.MINUTES;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.IgniteUtils.cancelOrConsume;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
@@ -29,13 +28,9 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Executors;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.LongSupplier;
-import org.apache.ignite.configuration.ConfigurationValue;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
import org.apache.ignite.internal.hlc.HybridClock;
@@ -72,7 +67,6 @@ import org.apache.ignite.internal.raft.RaftNodeId;
import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
import
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
import org.apache.ignite.internal.raft.service.RaftGroupService;
-import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
@@ -137,17 +131,10 @@ public class MetaStorageManagerImpl implements
MetaStorageManager {
private volatile MetaStorageConfiguration metaStorageConfiguration;
- private final ConfigurationValue<Long> idempotentCacheTtl;
-
- private final CompletableFuture<LongSupplier> maxClockSkewMillisFuture;
-
private volatile MetaStorageListener followerListener;
private volatile MetaStorageListener learnerListener;
- // TODO: https://issues.apache.org/jira/browse/IGNITE-19417 Remove, cache
eviction should be triggered by MS GC instead.
- private final ScheduledExecutorService idempotentCacheVacumizer;
-
private final List<ElectionListener> electionListeners = new
CopyOnWriteArrayList<>();
/**
@@ -160,7 +147,6 @@ public class MetaStorageManagerImpl implements
MetaStorageManager {
* @param storage Storage. This component owns this resource and will
manage its lifecycle.
* @param clock A hybrid logical clock.
* @param metricManager Metric manager.
- * @param maxClockSkewMillisFuture Future with maximum clock skew in
milliseconds.
*/
public MetaStorageManagerImpl(
ClusterService clusterService,
@@ -170,9 +156,7 @@ public class MetaStorageManagerImpl implements
MetaStorageManager {
KeyValueStorage storage,
HybridClock clock,
TopologyAwareRaftGroupServiceFactory
topologyAwareRaftGroupServiceFactory,
- MetricManager metricManager,
- ConfigurationValue<Long> idempotentCacheTtl,
- CompletableFuture<LongSupplier> maxClockSkewMillisFuture
+ MetricManager metricManager
) {
this.clusterService = clusterService;
this.raftMgr = raftMgr;
@@ -183,10 +167,6 @@ public class MetaStorageManagerImpl implements
MetaStorageManager {
this.metaStorageMetricSource = new
MetaStorageMetricSource(clusterTime);
this.topologyAwareRaftGroupServiceFactory =
topologyAwareRaftGroupServiceFactory;
this.metricManager = metricManager;
- this.idempotentCacheTtl = idempotentCacheTtl;
- this.maxClockSkewMillisFuture = maxClockSkewMillisFuture;
- this.idempotentCacheVacumizer =
Executors.newSingleThreadScheduledExecutor(
- NamedThreadFactory.create(clusterService.nodeName(),
"idempotent-cache-vacumizer", LOG));
}
/**
@@ -202,9 +182,7 @@ public class MetaStorageManagerImpl implements
MetaStorageManager {
HybridClock clock,
TopologyAwareRaftGroupServiceFactory
topologyAwareRaftGroupServiceFactory,
MetricManager metricManager,
- MetaStorageConfiguration configuration,
- ConfigurationValue<Long> idempotentCacheTtl,
- CompletableFuture<LongSupplier> maxClockSkewMillisFuture
+ MetaStorageConfiguration configuration
) {
this(
clusterService,
@@ -214,9 +192,7 @@ public class MetaStorageManagerImpl implements
MetaStorageManager {
storage,
clock,
topologyAwareRaftGroupServiceFactory,
- metricManager,
- idempotentCacheTtl,
- maxClockSkewMillisFuture
+ metricManager
);
configure(configuration);
@@ -338,12 +314,7 @@ public class MetaStorageManagerImpl implements
MetaStorageManager {
assert localMetaStorageConfiguration != null : "Meta Storage
configuration has not been set";
- followerListener = new MetaStorageListener(
- storage,
- clusterTime,
- idempotentCacheTtl,
- maxClockSkewMillisFuture
- );
+ followerListener = new MetaStorageListener(storage, clusterTime);
CompletableFuture<TopologyAwareRaftGroupService> raftServiceFuture =
raftMgr.startRaftGroupNodeAndWaitNodeReadyFuture(
new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer),
@@ -387,12 +358,7 @@ public class MetaStorageManagerImpl implements
MetaStorageManager {
assert localPeer != null;
- learnerListener = new MetaStorageListener(
- storage,
- clusterTime,
- idempotentCacheTtl,
- maxClockSkewMillisFuture
- );
+ learnerListener = new MetaStorageListener(storage, clusterTime);
return raftMgr.startRaftGroupNodeAndWaitNodeReadyFuture(
new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer),
@@ -457,8 +423,6 @@ public class MetaStorageManagerImpl implements
MetaStorageManager {
busyLock.block();
- idempotentCacheVacumizer.shutdownNow();
-
deployWatchesFuture.cancel(true);
recoveryFinishedFuture.cancel(true);
@@ -523,8 +487,6 @@ public class MetaStorageManagerImpl implements
MetaStorageManager {
MetaStorageManagerImpl.this.onRevisionApplied(revision);
}
});
-
-
idempotentCacheVacumizer.scheduleWithFixedDelay(this::evictIdempotentCommandsCache,
1, 1, MINUTES);
}))
.whenComplete((v, e) -> {
if (e == null) {
@@ -912,16 +874,20 @@ public class MetaStorageManagerImpl implements
MetaStorageManager {
}
/**
- * Removes obsolete entries from both volatile and persistent idempotent
command cache.
+ * Removes obsolete entries from both volatile and persistent idempotent
command cache older than evictionTimestamp.
+ *
+ * @param evictionTimestamp Cached entries older than given timestamp will
be evicted.
+ * @return Pending operation future.
*/
- @Deprecated(forRemoval = true)
- // TODO: https://issues.apache.org/jira/browse/IGNITE-19417 cache eviction
should be triggered by MS GC instead.
- public void evictIdempotentCommandsCache() {
- if (followerListener != null) {
- followerListener.evictIdempotentCommandsCache();
- }
- if (learnerListener != null) {
- learnerListener.evictIdempotentCommandsCache();
+ public CompletableFuture<Void>
evictIdempotentCommandsCache(HybridTimestamp evictionTimestamp) {
+ if (!busyLock.enterBusy()) {
+ return failedFuture(new NodeStoppingException());
+ }
+
+ try {
+ return metaStorageSvcFut.thenCompose(svc ->
svc.evictIdempotentCommandsCache(evictionTimestamp));
+ } finally {
+ busyLock.leaveBusy();
}
}
}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java
index 86b323f4e8..3025895273 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import
org.apache.ignite.internal.metastorage.command.EvictIdempotentCommandsCacheCommand;
import org.apache.ignite.internal.metastorage.command.GetAllCommand;
import org.apache.ignite.internal.metastorage.command.GetCommand;
import
org.apache.ignite.internal.metastorage.command.GetCurrentRevisionCommand;
@@ -268,6 +269,22 @@ public class MetaStorageServiceImpl implements
MetaStorageService {
return context.raftService().run(cmd);
}
+ /**
+ * Removes obsolete entries from both volatile and persistent idempotent
command cache.
+ *
+ * @param evictionTimestamp Cached entries older than given timestamp will
be evicted.
+ * @return Pending operation future.
+ */
+ CompletableFuture<Void> evictIdempotentCommandsCache(HybridTimestamp
evictionTimestamp) {
+ EvictIdempotentCommandsCacheCommand
evictIdempotentCommandsCacheCommand = evictIdempotentCommandsCacheCommand(
+ context.commandsFactory(),
+ evictionTimestamp,
+ clusterTime.now()
+ );
+
+ return context.raftService().run(evictIdempotentCommandsCacheCommand);
+ }
+
@Override
public void close() {
context.close();
@@ -332,4 +349,19 @@ public class MetaStorageServiceImpl implements
MetaStorageService {
return
commandsFactory.removeAllCommand().keys(list).initiatorTime(ts).build();
}
+
+ /**
+ * Creates evict idempotent commands cache command.
+ *
+ * @param commandsFactory Commands factory.
+ * @param evictionTimestamp Cached entries older than given timestamp will
be evicted.
+ * @param ts Local time.
+ */
+ private EvictIdempotentCommandsCacheCommand
evictIdempotentCommandsCacheCommand(
+ MetaStorageCommandsFactory commandsFactory,
+ HybridTimestamp evictionTimestamp,
+ HybridTimestamp ts
+ ) {
+ return
commandsFactory.evictIdempotentCommandsCacheCommand().evictionTimestamp(evictionTimestamp).initiatorTime(ts).build();
+ }
}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
index 3a9ca2a993..3159069f15 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
@@ -270,6 +270,14 @@ public interface KeyValueStorage extends ManuallyCloseable
{
*/
HybridTimestamp timestampByRevision(long revision);
+ /**
+ * Looks a revision lesser or equal to the timestamp.
+ *
+ * @param timestamp Timestamp by which to do a lookup.
+ * @return Revision lesser or equal to the timestamp or -1 if there is no
such revision.
+ */
+ long revisionByTimestamp(HybridTimestamp timestamp);
+
/**
* Sets the revision listener. This is needed only for the recovery, after
that listener must be set to {@code null}.
* {@code null} means that we no longer must be notified of revision
updates for recovery, because recovery is finished.
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
index d58efd4256..713a85951f 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
@@ -977,23 +977,12 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
try (WriteBatch batch = new WriteBatch()) {
- byte[] tsBytes = hybridTsToArray(lowWatermark);
- long maxRevision;
-
// Find a revision with timestamp lesser or equal to the watermark.
- try (RocksIterator rocksIterator = tsToRevision.newIterator()) {
- rocksIterator.seekForPrev(tsBytes);
-
- RocksUtils.checkIterator(rocksIterator);
-
- byte[] tsValue = rocksIterator.value();
-
- if (tsValue.length == 0) {
- // Nothing to compact yet.
- return;
- }
+ long maxRevision = revisionByTimestamp(lowWatermark);
- maxRevision = bytesToLong(tsValue);
+ if (maxRevision == -1) {
+ // Nothing to compact yet.
+ return;
}
try (RocksIterator iterator = index.newIterator()) {
@@ -1520,6 +1509,26 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
}
}
+ @Override
+ public long revisionByTimestamp(HybridTimestamp timestamp) {
+ byte[] tsBytes = hybridTsToArray(timestamp);
+
+ // Find a revision with timestamp lesser or equal to the watermark.
+ try (RocksIterator rocksIterator = tsToRevision.newIterator()) {
+ rocksIterator.seekForPrev(tsBytes);
+
+ RocksUtils.checkIterator(rocksIterator);
+
+ byte[] tsValue = rocksIterator.value();
+
+ if (tsValue.length == 0) {
+ return -1;
+ }
+
+ return bytesToLong(tsValue);
+ }
+ }
+
private void finishReplay() {
// Take the lock to drain the event cache and prevent new events from
being cached. Since event notification is asynchronous,
// this lock shouldn't be held for long.
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
index 5659e37227..8e4780e58b 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
@@ -26,10 +26,7 @@ import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
-import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
-import java.util.function.LongSupplier;
-import org.apache.ignite.configuration.ConfigurationValue;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.command.GetAllCommand;
@@ -67,17 +64,10 @@ public class MetaStorageListener implements
RaftGroupListener, BeforeApplyHandle
*/
public MetaStorageListener(
KeyValueStorage storage,
- ClusterTimeImpl clusterTime,
- ConfigurationValue<Long> idempotentCacheTtl,
- CompletableFuture<LongSupplier> maxClockSkewMillisFuture
+ ClusterTimeImpl clusterTime
) {
this.storage = storage;
- this.writeHandler = new MetaStorageWriteHandler(
- storage,
- clusterTime,
- idempotentCacheTtl,
- maxClockSkewMillisFuture
- );
+ this.writeHandler = new MetaStorageWriteHandler(storage, clusterTime);
}
@Override
@@ -191,13 +181,4 @@ public class MetaStorageListener implements
RaftGroupListener, BeforeApplyHandle
@Override
public void onShutdown() {
}
-
- /**
- * Removes obsolete entries from both volatile and persistent idempotent
command cache.
- */
- @Deprecated(forRemoval = true)
- // TODO: https://issues.apache.org/jira/browse/IGNITE-19417 cache eviction
should be triggered by MS GC instead.
- public void evictIdempotentCommandsCache() {
- writeHandler.evictIdempotentCommandsCache();
- }
}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
index bc226520f3..e63b29fff5 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
@@ -28,17 +28,15 @@ import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.LongSupplier;
-import org.apache.ignite.configuration.ConfigurationValue;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.metastorage.CommandId;
import org.apache.ignite.internal.metastorage.Entry;
+import
org.apache.ignite.internal.metastorage.command.EvictIdempotentCommandsCacheCommand;
import org.apache.ignite.internal.metastorage.command.IdempotentCommand;
import org.apache.ignite.internal.metastorage.command.InvokeCommand;
import org.apache.ignite.internal.metastorage.command.MetaStorageWriteCommand;
@@ -69,7 +67,6 @@ import
org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl;
import org.apache.ignite.internal.raft.Command;
import org.apache.ignite.internal.raft.WriteCommand;
import org.apache.ignite.internal.raft.service.CommandClosure;
-import org.apache.ignite.internal.util.ArrayUtils;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.Cursor;
import org.jetbrains.annotations.Nullable;
@@ -88,22 +85,14 @@ public class MetaStorageWriteHandler {
private final KeyValueStorage storage;
private final ClusterTimeImpl clusterTime;
- private final Map<CommandId, IdempotentCommandCachedResult>
idempotentCommandCache = new ConcurrentHashMap<>();
-
- private final ConfigurationValue<Long> idempotentCacheTtl;
-
- private final CompletableFuture<LongSupplier> maxClockSkewMillisFuture;
+ private final Map<CommandId, @Nullable Serializable>
idempotentCommandCache = new ConcurrentHashMap<>();
MetaStorageWriteHandler(
KeyValueStorage storage,
- ClusterTimeImpl clusterTime,
- ConfigurationValue<Long> idempotentCacheTtl,
- CompletableFuture<LongSupplier> maxClockSkewMillisFuture
+ ClusterTimeImpl clusterTime
) {
this.storage = storage;
this.clusterTime = clusterTime;
- this.idempotentCacheTtl = idempotentCacheTtl;
- this.maxClockSkewMillisFuture = maxClockSkewMillisFuture;
}
/**
@@ -118,10 +107,10 @@ public class MetaStorageWriteHandler {
IdempotentCommand idempotentCommand = ((IdempotentCommand)
command);
CommandId commandId = idempotentCommand.id();
- IdempotentCommandCachedResult cachedResult =
idempotentCommandCache.get(commandId);
+ Serializable cachedResult = idempotentCommandCache.get(commandId);
if (cachedResult != null) {
- clo.result(cachedResult.result);
+ clo.result(cachedResult);
return;
} else {
@@ -215,6 +204,11 @@ public class MetaStorageWriteHandler {
} else if (command instanceof SyncTimeCommand) {
storage.advanceSafeTime(command.safeTime());
+ clo.result(null);
+ } else if (command instanceof EvictIdempotentCommandsCacheCommand) {
+ EvictIdempotentCommandsCacheCommand cmd =
(EvictIdempotentCommandsCacheCommand) command;
+ evictIdempotentCommandsCache(cmd.evictionTimestamp(), opTime);
+
clo.result(null);
}
}
@@ -352,8 +346,6 @@ public class MetaStorageWriteHandler {
byte[] keyTo = storage.nextKey(IDEMPOTENT_COMMAND_PREFIX_BYTES);
Cursor<Entry> cursor = storage.range(keyFrom, keyTo);
- // It's fine to lose original command start time - in that case we
will store the entry a little bit longer that necessary.
- HybridTimestamp now = clusterTime.now();
try (cursor) {
for (Entry entry : cursor) {
@@ -368,7 +360,7 @@ public class MetaStorageWriteHandler {
result =
MSG_FACTORY.statementResult().result(ByteBuffer.wrap(entry.value())).build();
}
- idempotentCommandCache.put(commandId, new
IdempotentCommandCachedResult(result, now));
+ idempotentCommandCache.put(commandId, result);
}
}
}
@@ -376,48 +368,45 @@ public class MetaStorageWriteHandler {
/**
* Removes obsolete entries from both volatile and persistent idempotent
command cache.
+ *
+ * @param evictionTimestamp Cached entries older than given timestamp will
be evicted.
+ * @param operationTimestamp Command operation timestamp.
*/
- // TODO: https://issues.apache.org/jira/browse/IGNITE-19417 Call on meta
storage compaction.
- void evictIdempotentCommandsCache() {
- HybridTimestamp cleanupTimestamp = clusterTime.now();
- LOG.info("Idempotent command cache cleanup started
[cleanupTimestamp={}].", cleanupTimestamp);
-
- maxClockSkewMillisFuture.thenAccept(maxClockSkewMillis -> {
- List<CommandId> commandIdsToRemove =
idempotentCommandCache.entrySet().stream()
- .filter(entry ->
entry.getValue().commandStartTime.getPhysical()
- <= cleanupTimestamp.getPhysical() -
(idempotentCacheTtl.value() + maxClockSkewMillis.getAsLong()))
- .map(Map.Entry::getKey)
+ void evictIdempotentCommandsCache(HybridTimestamp evictionTimestamp,
HybridTimestamp operationTimestamp) {
+ LOG.info("Idempotent command cache cleanup started
[evictionTimestamp={}].", evictionTimestamp);
+
+ long obsoleteRevision = storage.revisionByTimestamp(evictionTimestamp);
+
+ if (obsoleteRevision != -1) {
+ byte[] keyFrom = IDEMPOTENT_COMMAND_PREFIX_BYTES;
+ byte[] keyTo = storage.nextKey(IDEMPOTENT_COMMAND_PREFIX_BYTES);
+
+ List<byte[]> evictionCandidateKeys = storage.range(keyFrom, keyTo,
obsoleteRevision).stream()
+ // Not sure whether it's possible to retrieve empty entry
here, thus !entry.empty() was added just in case.
+ .filter(entry -> !entry.tombstone() && !entry.empty())
+ .map(Entry::key)
.collect(toList());
- if (!commandIdsToRemove.isEmpty()) {
- List<byte[]> commandIdStorageKeys = commandIdsToRemove.stream()
- .map(commandId -> ArrayUtils.concat(new byte[]{},
ByteUtils.toBytes(commandId)))
- .collect(toList());
+ // TODO https://issues.apache.org/jira/browse/IGNITE-22828
+ evictionCandidateKeys.forEach(evictionCandidateKeyBytes -> {
+ CommandId commandId = ByteUtils.fromBytes(
+ evictionCandidateKeyBytes,
+ IDEMPOTENT_COMMAND_PREFIX_BYTES.length,
+ evictionCandidateKeyBytes.length
+ );
- storage.removeAll(commandIdStorageKeys, null);
+ idempotentCommandCache.remove(commandId);
+ });
-
commandIdsToRemove.forEach(idempotentCommandCache.keySet()::remove);
- }
+ storage.removeAll(evictionCandidateKeys, operationTimestamp);
- LOG.info("Idempotent command cache cleanup finished
[cleanupTimestamp={}, cleanupCompletionTimestamp={},"
+ LOG.info("Idempotent command cache cleanup finished
[evictionTimestamp={}, cleanupCompletionTimestamp={},"
+ " removedEntriesCount={}, cacheSize={}].",
- cleanupTimestamp,
+ evictionTimestamp,
clusterTime.now(),
- commandIdsToRemove.size(),
+ evictionCandidateKeys.size(),
idempotentCommandCache.size()
);
- });
- }
-
- private static class IdempotentCommandCachedResult {
- @Nullable
- final Serializable result;
-
- final HybridTimestamp commandStartTime;
-
- IdempotentCommandCachedResult(@Nullable Serializable result,
HybridTimestamp commandStartTime) {
- this.result = result;
- this.commandStartTime = commandStartTime;
}
}
@@ -451,7 +440,7 @@ public class MetaStorageWriteHandler {
// Exceptions are not cached.
if (!(res instanceof Throwable)) {
- idempotentCommandCache.put(command.id(), new
IdempotentCommandCachedResult(res, command.initiatorTime()));
+ idempotentCommandCache.put(command.id(), res);
}
closure.result(res);
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/IdempotentCommandCacheTest.java
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/IdempotentCommandCacheTest.java
index b79593ad5e..f41319401a 100644
---
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/IdempotentCommandCacheTest.java
+++
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/IdempotentCommandCacheTest.java
@@ -17,8 +17,6 @@
package org.apache.ignite.internal.metastorage.impl;
-import static java.util.concurrent.CompletableFuture.completedFuture;
-import static
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
import static org.apache.ignite.internal.metastorage.dsl.Operations.ops;
import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
@@ -36,7 +34,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
-import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.lang.ByteArray;
@@ -52,7 +49,6 @@ import
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStora
import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl;
import org.apache.ignite.internal.raft.WriteCommand;
-import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.raft.service.CommandClosure;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
@@ -81,18 +77,10 @@ public class IdempotentCommandCacheTest extends
BaseIgniteAbstractTest {
private final CommandIdGenerator commandIdGenerator = new
CommandIdGenerator(() -> UUID.randomUUID().toString());
- @InjectConfiguration
- private RaftConfiguration raftConfiguration;
-
@BeforeEach
public void setUp() {
storage = new SimpleInMemoryKeyValueStorage(NODE_NAME);
- metaStorageListener = new MetaStorageListener(
- storage,
- new ClusterTimeImpl(NODE_NAME, new IgniteSpinBusyLock(),
clock),
- raftConfiguration.retryTimeout(),
- completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
- );
+ metaStorageListener = new MetaStorageListener(storage, new
ClusterTimeImpl(NODE_NAME, new IgniteSpinBusyLock(), clock));
}
@Test
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java
index c548f0d734..d6fc9c0431 100644
---
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java
+++
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.metastorage.impl;
import static java.util.concurrent.CompletableFuture.completedFuture;
-import static
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -44,7 +43,6 @@ import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.raft.RaftManager;
import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
import
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
-import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
@@ -58,9 +56,6 @@ public class MetaStorageDeployWatchesCorrectnessTest extends
IgniteAbstractTest
@InjectConfiguration
private static MetaStorageConfiguration metaStorageConfiguration;
- @InjectConfiguration
- private static RaftConfiguration raftConfiguration;
-
/**
* Returns a stream with test arguments.
*
@@ -92,9 +87,7 @@ public class MetaStorageDeployWatchesCorrectnessTest extends
IgniteAbstractTest
clock,
mock(TopologyAwareRaftGroupServiceFactory.class),
new NoOpMetricManager(),
- metaStorageConfiguration,
- raftConfiguration.retryTimeout(),
- completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
+ metaStorageConfiguration
),
StandaloneMetaStorageManager.create()
);
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java
index e998f62024..14da02ec7c 100644
---
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java
+++
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.metastorage.impl;
import static java.util.concurrent.CompletableFuture.completedFuture;
-import static
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
@@ -50,7 +49,6 @@ import org.apache.ignite.internal.network.TopologyService;
import
org.apache.ignite.internal.network.serialization.MessageSerializationRegistry;
import org.apache.ignite.internal.raft.RaftManager;
import
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
-import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.network.NodeMetadata;
@@ -67,9 +65,6 @@ public class MetaStorageManagerRecoveryTest extends
BaseIgniteAbstractTest {
@InjectConfiguration
private static MetaStorageConfiguration metaStorageConfiguration;
- @InjectConfiguration
- private static RaftConfiguration raftConfiguration;
-
private MetaStorageManagerImpl metaStorageManager;
private KeyValueStorage kvs;
@@ -94,9 +89,7 @@ public class MetaStorageManagerRecoveryTest extends
BaseIgniteAbstractTest {
clock,
mock(TopologyAwareRaftGroupServiceFactory.class),
new NoOpMetricManager(),
- metaStorageConfiguration,
- raftConfiguration.retryTimeout(),
- completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
+ metaStorageConfiguration
);
}
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
index d6b695729a..a27ce6062e 100644
---
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
+++
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
@@ -19,6 +19,9 @@ package org.apache.ignite.internal.metastorage.server;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.function.Function.identity;
+import static org.apache.ignite.internal.hlc.HybridTimestamp.MAX_VALUE;
+import static org.apache.ignite.internal.hlc.HybridTimestamp.MIN_VALUE;
+import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
import static org.apache.ignite.internal.metastorage.dsl.Operations.ops;
import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
import static org.apache.ignite.internal.metastorage.dsl.Operations.remove;
@@ -2093,6 +2096,63 @@ public abstract class BasicOperationsKeyValueStorageTest
extends AbstractKeyValu
verify(mockCallback, never()).onRevisionApplied(anyLong());
}
+ @Test
+ public void testRevisionByTimestamp() {
+ // Verify that in case of empty storage -1 will be returned.
+ assertEquals(-1, storage.revisionByTimestamp(MIN_VALUE));
+ assertEquals(-1, storage.revisionByTimestamp(hybridTimestamp(2)));
+ assertEquals(-1, storage.revisionByTimestamp(hybridTimestamp(5)));
+ assertEquals(-1, storage.revisionByTimestamp(hybridTimestamp(7)));
+ assertEquals(-1, storage.revisionByTimestamp(hybridTimestamp(10)));
+ assertEquals(-1, storage.revisionByTimestamp(hybridTimestamp(12)));
+ assertEquals(-1, storage.revisionByTimestamp(hybridTimestamp(15)));
+ assertEquals(-1, storage.revisionByTimestamp(hybridTimestamp(17)));
+ assertEquals(-1, storage.revisionByTimestamp(MAX_VALUE));
+
+ // Populate storage with some data in order to have following revision
to timestamp mapping:
+ // 1 -> 5
+ // 2 -> 10
+ // 3 -> 15
+ {
+ storage.put(key(1), keyValue(1, 1), hybridTimestamp(5));
+ assertEquals(1, storage.revision());
+
+ storage.put(key(1), keyValue(1, 1), hybridTimestamp(10));
+ assertEquals(2, storage.revision());
+
+ storage.put(key(2), keyValue(2, 2), hybridTimestamp(15));
+ assertEquals(3, storage.revision());
+ }
+
+ // Check revisionByTimestamp()
+ {
+ assertEquals(-1, storage.revisionByTimestamp(MIN_VALUE));
+
+ // There's no revision associated with 2, so closest left one is
expected.
+ assertEquals(-1, storage.revisionByTimestamp(hybridTimestamp(2)));
+
+ // Exact matching 1 -> 5
+ assertEquals(1, storage.revisionByTimestamp(hybridTimestamp(5)));
+
+ // There's no revision associated with 7, so closest left one is
expected.
+ assertEquals(1, storage.revisionByTimestamp(hybridTimestamp(7)));
+
+ // Exact matching 2 -> 10
+ assertEquals(2, storage.revisionByTimestamp(hybridTimestamp(10)));
+
+ // There's no revision associated with 12, so closest left one is
expected.
+ assertEquals(2, storage.revisionByTimestamp(hybridTimestamp(12)));
+
+ // Exact matching 3 -> 15
+ assertEquals(3, storage.revisionByTimestamp(hybridTimestamp(15)));
+
+ // There's no revision associated with 17, so closest left one is
expected.
+ assertEquals(3, storage.revisionByTimestamp(hybridTimestamp(17)));
+
+ assertEquals(3, storage.revisionByTimestamp(MAX_VALUE));
+ }
+ }
+
private CompletableFuture<Void> watchExact(
byte[] key, long revision, int expectedNumCalls,
BiConsumer<WatchEvent, Integer> testCondition
) {
@@ -2160,19 +2220,19 @@ public abstract class
BasicOperationsKeyValueStorageTest extends AbstractKeyValu
}
void putToMs(byte[] key, byte[] value) {
- storage.put(key, value, HybridTimestamp.MIN_VALUE);
+ storage.put(key, value, MIN_VALUE);
}
private void putAllToMs(List<byte[]> keys, List<byte[]> values) {
- storage.putAll(keys, values, HybridTimestamp.MIN_VALUE);
+ storage.putAll(keys, values, MIN_VALUE);
}
private void removeFromMs(byte[] key) {
- storage.remove(key, HybridTimestamp.MIN_VALUE);
+ storage.remove(key, MIN_VALUE);
}
private void removeAllFromMs(List<byte[]> keys) {
- storage.removeAll(keys, HybridTimestamp.MIN_VALUE);
+ storage.removeAll(keys, MIN_VALUE);
}
private boolean invokeOnMs(Condition condition, Collection<Operation>
success, Collection<Operation> failure) {
@@ -2180,7 +2240,7 @@ public abstract class BasicOperationsKeyValueStorageTest
extends AbstractKeyValu
condition,
success,
failure,
- HybridTimestamp.MIN_VALUE,
+ MIN_VALUE,
new CommandIdGenerator(() ->
UUID.randomUUID().toString()).newId()
);
}
@@ -2188,7 +2248,7 @@ public abstract class BasicOperationsKeyValueStorageTest
extends AbstractKeyValu
private StatementResult invokeOnMs(If iif) {
return storage.invoke(
iif,
- HybridTimestamp.MIN_VALUE,
+ MIN_VALUE,
new CommandIdGenerator(() ->
UUID.randomUUID().toString()).newId()
);
}
diff --git
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
index 92623da88d..3cd5ed52fc 100644
---
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
+++
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.metastorage.impl;
import static java.util.Collections.singleton;
import static java.util.concurrent.CompletableFuture.completedFuture;
-import static
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -29,7 +28,6 @@ import java.io.Serializable;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.function.LongSupplier;
import org.apache.ignite.configuration.ConfigurationValue;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
@@ -108,9 +106,7 @@ public class StandaloneMetaStorageManager extends
MetaStorageManagerImpl {
keyValueStorage,
mock(TopologyAwareRaftGroupServiceFactory.class),
mockConfiguration(),
- clock,
- mockRaftConfiguration().retryTimeout(),
- completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
+ clock
);
}
@@ -131,9 +127,7 @@ public class StandaloneMetaStorageManager extends
MetaStorageManagerImpl {
KeyValueStorage storage,
TopologyAwareRaftGroupServiceFactory raftServiceFactory,
MetaStorageConfiguration configuration,
- HybridClock clock,
- ConfigurationValue<Long> idempotentCacheTtl,
- CompletableFuture<LongSupplier> maxClockSkewMillisFuture
+ HybridClock clock
) {
super(
clusterService,
@@ -144,9 +138,7 @@ public class StandaloneMetaStorageManager extends
MetaStorageManagerImpl {
clock,
raftServiceFactory,
new NoOpMetricManager(),
- configuration,
- idempotentCacheTtl,
- maxClockSkewMillisFuture
+ configuration
);
}
diff --git
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
index 406510ffb3..3822585e73 100644
---
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
+++
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -391,6 +391,20 @@ public class SimpleInMemoryKeyValueStorage implements
KeyValueStorage {
}
}
+ @Override
+ public long revisionByTimestamp(HybridTimestamp timestamp) {
+ synchronized (mux) {
+ Map.Entry<Long, Long> revisionEntry =
tsToRevMap.floorEntry(timestamp.longValue());
+
+ if (revisionEntry == null) {
+ // Nothing to compact yet.
+ return -1;
+ }
+
+ return revisionEntry.getValue();
+ }
+ }
+
@Override
public void setRecoveryRevisionListener(@Nullable LongConsumer listener) {
synchronized (mux) {
@@ -493,15 +507,12 @@ public class SimpleInMemoryKeyValueStorage implements
KeyValueStorage {
NavigableMap<Long, NavigableMap<byte[], Value>> compactedRevsIdx =
new TreeMap<>();
- Map.Entry<Long, Long> revisionEntry =
tsToRevMap.floorEntry(lowWatermark.longValue());
+ long maxRevision = revisionByTimestamp(lowWatermark);
- if (revisionEntry == null) {
- // Nothing to compact yet.
+ if (maxRevision == -1) {
return;
}
- long maxRevision = revisionEntry.getValue();
-
keysIdx.forEach((key, revs) -> compactForKey(key, revs,
compactedKeysIdx, compactedRevsIdx, maxRevision));
keysIdx = compactedKeysIdx;
diff --git
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
index 5042c2621b..50eb3d999f 100644
---
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
+++
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
@@ -33,7 +33,6 @@ import static
org.apache.ignite.internal.partition.replicator.PartitionReplicaLi
import static org.apache.ignite.internal.sql.SqlCommon.DEFAULT_SCHEMA_NAME;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
-import static
org.apache.ignite.internal.testframework.TestIgnitionManager.DEFAULT_MAX_CLOCK_SKEW_MS;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
import static org.apache.ignite.internal.util.ByteUtils.toByteArray;
@@ -187,6 +186,7 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
@@ -524,7 +524,7 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
);
}
- @Test
+ @RepeatedTest(100)
@Disabled("https://issues.apache.org/jira/browse/IGNITE-22858")
void testAlterFilterTrigger(TestInfo testInfo) throws Exception {
startNodes(testInfo, 3);
@@ -960,9 +960,7 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
hybridClock,
topologyAwareRaftGroupServiceFactory,
new NoOpMetricManager(),
- metaStorageConfiguration,
- raftConfiguration.retryTimeout(),
- completedFuture(() -> DEFAULT_MAX_CLOCK_SKEW_MS)
+ metaStorageConfiguration
) {
@Override
public CompletableFuture<Boolean> invoke(
diff --git
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java
index 2282eb86f5..24f7091e3e 100644
---
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java
+++
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java
@@ -266,9 +266,7 @@ public class MultiActorPlacementDriverTest extends
BasePlacementDriverTest {
nodeClock,
topologyAwareRaftGroupServiceFactory,
new NoOpMetricManager(),
- metaStorageConfiguration,
- raftConfiguration.retryTimeout(),
- completedFuture(clockService::maxClockSkewMillis)
+ metaStorageConfiguration
);
if (this.metaStorageManager == null) {
diff --git
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
index f586f89e0b..6a6f0a0edc 100644
---
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
+++
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
@@ -203,9 +203,7 @@ public class PlacementDriverManagerTest extends
BasePlacementDriverTest {
nodeClock,
topologyAwareRaftGroupServiceFactory,
new NoOpMetricManager(),
- metaStorageConfiguration,
- raftConfiguration.retryTimeout(),
- completedFuture(clockService::maxClockSkewMillis)
+ metaStorageConfiguration
);
placementDriverManager = new PlacementDriverManager(
diff --git
a/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/service/ItAbstractListenerSnapshotTest.java
b/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/service/ItAbstractListenerSnapshotTest.java
index 2913279d95..01e8aebd08 100644
---
a/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/service/ItAbstractListenerSnapshotTest.java
+++
b/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/service/ItAbstractListenerSnapshotTest.java
@@ -107,7 +107,7 @@ public abstract class ItAbstractListenerSnapshotTest<T
extends RaftGroupListener
private ScheduledExecutorService executor;
@InjectConfiguration
- protected RaftConfiguration raftConfiguration;
+ private RaftConfiguration raftConfiguration;
/**
* Create executor for raft group services.
diff --git a/modules/runner/build.gradle b/modules/runner/build.gradle
index 5442a588b3..5e66356dc3 100644
--- a/modules/runner/build.gradle
+++ b/modules/runner/build.gradle
@@ -56,6 +56,7 @@ dependencies {
implementation project(':ignite-raft-api')
implementation project(':ignite-raft')
implementation project(':ignite-metastorage')
+ implementation project(':ignite-metastorage-cache')
implementation project(':ignite-affinity')
implementation project(':ignite-table')
implementation project(':ignite-index')
@@ -147,6 +148,7 @@ dependencies {
exclude group: 'com.fasterxml.jackson.core', module: 'jackson-databind'
}
integrationTestImplementation project(':ignite-metastorage')
+ integrationTestImplementation project(':ignite-metastorage-cache')
integrationTestImplementation project(':ignite-metrics')
integrationTestImplementation project(':ignite-table')
integrationTestImplementation project(':ignite-transactions')
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
index 48884adb1d..a2ad98eb88 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
@@ -18,9 +18,7 @@
package org.apache.ignite.internal.configuration;
import static java.util.concurrent.CompletableFuture.allOf;
-import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.Collectors.toUnmodifiableList;
-import static
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
@@ -212,9 +210,7 @@ public class ItDistributedConfigurationPropertiesTest
extends BaseIgniteAbstract
clock,
topologyAwareRaftGroupServiceFactory,
new NoOpMetricManager(),
- metaStorageConfiguration,
- raftConfiguration.retryTimeout(),
- completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
+ metaStorageConfiguration
);
deployWatchesFut = metaStorageManager.deployWatches();
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
index cff23967a3..4ca6f6c31d 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
@@ -17,8 +17,6 @@
package org.apache.ignite.internal.configuration.storage;
-import static java.util.concurrent.CompletableFuture.completedFuture;
-import static
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
@@ -184,9 +182,7 @@ public class ItDistributedConfigurationStorageTest extends
BaseIgniteAbstractTes
clock,
topologyAwareRaftGroupServiceFactory,
new NoOpMetricManager(),
- metaStorageConfiguration,
- raftConfiguration.retryTimeout(),
- completedFuture(() -> TEST_MAX_CLOCK_SKEW_MILLIS)
+ metaStorageConfiguration
);
deployWatchesFut = metaStorageManager.deployWatches();
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 7064e297a9..0637c23cc6 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -438,9 +438,7 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
hybridClock,
topologyAwareRaftGroupServiceFactory,
metricManager,
- metaStorageConfiguration,
- raftConfiguration.retryTimeout(),
- maxClockSkewFuture
+ metaStorageConfiguration
) {
@Override
public CompletableFuture<Boolean> invoke(Condition condition,
Collection<Operation> success, Collection<Operation> failure) {
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 9ce9ef6e3a..28bcdc769e 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.app;
import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.stream.Collectors.toSet;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.REBALANCE_SCHEDULER_POOL_SIZE;
import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_READ;
@@ -134,6 +135,7 @@ import
org.apache.ignite.internal.lowwatermark.event.ChangeLowWatermarkEventPara
import org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.cache.IdempotentCacheVacuumizer;
import
org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration;
import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
import
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
@@ -321,6 +323,9 @@ public class IgniteImpl implements Ignite {
/** Configuration manager that handles cluster (distributed)
configuration. */
private final ConfigurationManager clusterCfgMgr;
+ /** Idempotent cache vacuumizer. */
+ private final IdempotentCacheVacuumizer idempotentCacheVacuumizer;
+
/** Cluster initializer. */
private final ClusterInitializer clusterInitializer;
@@ -588,8 +593,6 @@ public class IgniteImpl implements Ignite {
raftGroupEventsClientListener
);
- CompletableFuture<LongSupplier> maxClockSkewMillisFuture = new
CompletableFuture<>();
-
metaStorageMgr = new MetaStorageManagerImpl(
clusterSvc,
cmgMgr,
@@ -598,9 +601,7 @@ public class IgniteImpl implements Ignite {
new RocksDbKeyValueStorage(name,
workDir.resolve(METASTORAGE_DB_PATH), failureProcessor),
clock,
topologyAwareRaftGroupServiceFactory,
- metricManager,
- raftConfiguration.retryTimeout(),
- maxClockSkewMillisFuture
+ metricManager
);
this.cfgStorage = new DistributedConfigurationStorage(name,
metaStorageMgr);
@@ -622,7 +623,18 @@ public class IgniteImpl implements Ignite {
clockService = new ClockServiceImpl(clock, clockWaiter, new
SameValueLongSupplier(() -> schemaSyncConfig.maxClockSkew().value()));
- maxClockSkewMillisFuture.complete(clockService::maxClockSkewMillis);
+ idempotentCacheVacuumizer = new IdempotentCacheVacuumizer(
+ name,
+ threadPoolsManager.commonScheduler(),
+ metaStorageMgr::evictIdempotentCommandsCache,
+
nodeCfgMgr.configurationRegistry().getConfiguration(RaftConfiguration.KEY).retryTimeout(),
+ clockService,
+ 1,
+ 1,
+ MINUTES
+ );
+
+ metaStorageMgr.addElectionListener(idempotentCacheVacuumizer);
Consumer<LongFunction<CompletableFuture<?>>> registry = c ->
metaStorageMgr.registerRevisionUpdateListener(c::apply);
@@ -1114,7 +1126,6 @@ public class IgniteImpl implements Ignite {
clusterSvc.updateMetadata(
new NodeMetadata(restComponent.hostName(),
restComponent.httpPort(), restComponent.httpsPort()));
-
} catch (Throwable e) {
startupExecutor.shutdownNow();
@@ -1168,6 +1179,7 @@ public class IgniteImpl implements Ignite {
catalogCompactionRunner,
indexMetaStorage,
clusterCfgMgr,
+ idempotentCacheVacuumizer,
authenticationManager,
placementDriverMgr,
metricManager,
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index b61fb76b24..fcc7294998 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.rebalance;
import static java.util.Collections.reverse;
import static java.util.concurrent.CompletableFuture.allOf;
-import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.toSet;
@@ -1174,9 +1173,7 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
hybridClock,
topologyAwareRaftGroupServiceFactory,
metricManager,
- metaStorageConfiguration,
- raftConfiguration.retryTimeout(),
- completedFuture(() -> DEFAULT_MAX_CLOCK_SKEW_MS)
+ metaStorageConfiguration
);
placementDriver = new TestPlacementDriver(() ->
PRIMARY_FILTER.apply(clusterService.topologyService().allMembers()));
diff --git a/settings.gradle b/settings.gradle
index 00016e3ec5..a5c995b2b1 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -54,6 +54,7 @@ include(':ignite-runner')
include(':ignite-index')
include(':ignite-metastorage-api')
include(':ignite-metastorage')
+include(':ignite-metastorage-cache')
include(':ignite-rest-api')
include(':ignite-storage-rocksdb')
include(':ignite-configuration-annotation-processor')
@@ -126,6 +127,7 @@ project(":ignite-runner").projectDir =
file('modules/runner')
project(":ignite-index").projectDir = file('modules/index')
project(":ignite-metastorage-api").projectDir = file('modules/metastorage-api')
project(":ignite-metastorage").projectDir = file('modules/metastorage')
+project(":ignite-metastorage-cache").projectDir =
file('modules/metastorage-cache')
project(":ignite-rest-api").projectDir = file('modules/rest-api')
project(":ignite-storage-rocksdb").projectDir = file('modules/storage-rocksdb')
project(":ignite-configuration-annotation-processor").projectDir =
file('modules/configuration-annotation-processor')