This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new c04828abd1 IGNITE-23294 Handle Metastorage leader change case for
Compaction Trigger (#4687)
c04828abd1 is described below
commit c04828abd1f4df2a778d8dd60cdd86bac4aacaa1
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Thu Nov 7 12:22:56 2024 +0300
IGNITE-23294 Handle Metastorage leader change case for Compaction Trigger
(#4687)
---
.../CompactionRevisionUpdateListener.java | 2 +-
.../impl/ItMetaStorageCompactionTriggerTest.java | 54 ++++++++++++++++++++++
.../impl/MetaStorageCompactionTrigger.java | 26 ++++++++++-
.../server/AbstractKeyValueStorage.java | 25 +++++++++-
4 files changed, 103 insertions(+), 4 deletions(-)
diff --git
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/CompactionRevisionUpdateListener.java
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/CompactionRevisionUpdateListener.java
index 57f15ad994..7f8b1575ec 100644
---
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/CompactionRevisionUpdateListener.java
+++
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/CompactionRevisionUpdateListener.java
@@ -21,7 +21,7 @@ package org.apache.ignite.internal.metastorage;
* Listener which receives and handles the metastorage compaction revision
update after watches have been started.
*
* <p>It is guaranteed that it will <b>not</b> be invoked in parallel with the
handling of watch events or metastore revision update
- * events.</p>
+ * events, and it will also grow monotonously without duplicates.</p>
*/
@FunctionalInterface
public interface CompactionRevisionUpdateListener {
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageCompactionTriggerTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageCompactionTriggerTest.java
index 3b4e5860c5..e072eaad36 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageCompactionTriggerTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageCompactionTriggerTest.java
@@ -24,15 +24,21 @@ import static
org.apache.ignite.internal.metastorage.TestMetasStorageUtils.VALUE
import static
org.apache.ignite.internal.metastorage.TestMetasStorageUtils.allNodesContainSingleRevisionForKeyLocally;
import static
org.apache.ignite.internal.metastorage.TestMetasStorageUtils.createClusterConfigWithCompactionProperties;
import static
org.apache.ignite.internal.metastorage.TestMetasStorageUtils.latestKeyRevision;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.hasSize;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
+import java.util.List;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import org.apache.ignite.InitParametersBuilder;
import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
@@ -90,6 +96,54 @@ public class ItMetaStorageCompactionTriggerTest extends
ClusterPerClassIntegrati
);
}
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void testCompactionRevisionMonotonouslyGrowsWithoutDuplicates(boolean
changeMetastorageLeader) {
+ MetaStorageManagerImpl metaStorageManager = (MetaStorageManagerImpl)
aliveNode().metaStorageManager();
+
+ long currentCompactionRevisionLocally =
metaStorageManager.getCompactionRevisionLocally();
+
+ log.info("Current compaction revision before start: " +
currentCompactionRevisionLocally);
+
+ var stopCollectCompactionRevisionFuture = new
CompletableFuture<Void>();
+ var compactionRevisions = new ConcurrentLinkedQueue<Long>();
+
+
metaStorageManager.storage().registerCompactionRevisionUpdateListener(compactionRevision
-> {
+ compactionRevisions.add(compactionRevision);
+
+ if (compactionRevision >= currentCompactionRevisionLocally + 100) {
+ stopCollectCompactionRevisionFuture.complete(null);
+ }
+ });
+
+ CompletableFuture<Void> updateMetastorageForCompactionFuture =
runAsync(() -> {
+ while (!stopCollectCompactionRevisionFuture.isDone()) {
+ assertThat(metaStorageManager.put(FOO_KEY, VALUE),
willCompleteSuccessfully());
+ }
+ });
+
+ CompletableFuture<Void> changeMetastorageLeaderFuture =
nullCompletedFuture();
+
+ if (changeMetastorageLeader) {
+ changeMetastorageLeaderFuture = runAsync(() -> {
+ while (!stopCollectCompactionRevisionFuture.isDone()) {
+ transferMetastorageLeadershipToAnotherNode();
+
+ Thread.sleep(50);
+ }
+ });
+ }
+
+ assertThat(stopCollectCompactionRevisionFuture,
willCompleteSuccessfully());
+ assertThat(updateMetastorageForCompactionFuture,
willCompleteSuccessfully());
+ assertThat(changeMetastorageLeaderFuture, willCompleteSuccessfully());
+
+ // Let's check that there are no duplicates.
+ List<Long> copyCompactionRevisions = List.copyOf(compactionRevisions);
+ assertThat(copyCompactionRevisions, hasSize(greaterThan(0)));
+ assertThat(Set.copyOf(copyCompactionRevisions),
hasSize(copyCompactionRevisions.size()));
+ }
+
private static IgniteImpl aliveNode() {
return unwrapIgniteImpl(CLUSTER.aliveNode());
}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageCompactionTrigger.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageCompactionTrigger.java
index 8731697a14..f33041ea3e 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageCompactionTrigger.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageCompactionTrigger.java
@@ -266,6 +266,30 @@ public class MetaStorageCompactionTrigger implements
IgniteComponent {
}
}
+ /**
+ * It should be used precisely at the complete of the compaction, so as
not to schedule a new update together with the event of
+ * electing a local node as a new leader.
+ */
+ private void scheduleNextCompactionIfNotScheduleBusy() {
+ lock.lock();
+
+ try {
+ if (started && isLocalNodeLeader) {
+ ScheduledFuture<?> lastScheduledFuture =
this.lastScheduledFuture;
+
+ if (lastScheduledFuture == null ||
lastScheduledFuture.isDone()) {
+ this.lastScheduledFuture = compactionExecutor.schedule(
+ () -> inBusyLock(busyLock, this::doCompactionBusy),
+ config.interval(),
+ MILLISECONDS
+ );
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
/** Invoked when the metastorage compaction revision is updated. */
private void onCompactionRevisionUpdate(long compactionRevision) {
inBusyLockSafe(busyLock, () ->
onCompactionRevisionUpdateBusy(compactionRevision));
@@ -290,7 +314,7 @@ public class MetaStorageCompactionTrigger implements
IgniteComponent {
}
}
- inBusyLockSafe(busyLock, this::scheduleNextCompactionBusy);
+ inBusyLockSafe(busyLock,
this::scheduleNextCompactionIfNotScheduleBusy);
});
}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java
index 9a77c26a8c..23112ae386 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java
@@ -84,6 +84,19 @@ public abstract class AbstractKeyValueStorage implements
KeyValueStorage {
*/
protected long compactionRevision = -1;
+ /**
+ * Planned for update compaction revision to ensure monotony without
duplicates when updating it.
+ *
+ * <p>This is necessary to avoid a situation when changing the leader, we
get two requests to update the same compaction revision.
+ * Fixing the leader change problem is not at the protocol level since the
update is performed asynchronously and in the background and
+ * we can get into a gap when commands came from different leaders to the
same compaction revision, but we simply did not have time to
+ * process the update of the compaction revision from the previous leader.
This is necessary to cover corner cases with a sufficiently
+ * small compaction revision update interval.</p>
+ *
+ * <p>Multi-threaded access is guarded by {@link #rwLock}.</p>
+ */
+ private volatile long planedUpdateCompactionRevision = -1;
+
protected final AtomicBoolean stopCompaction = new AtomicBoolean();
/** Tracks only cursors, since reading a single entry or a batch is done
entirely under {@link #rwLock}. */
@@ -265,8 +278,16 @@ public abstract class AbstractKeyValueStorage implements
KeyValueStorage {
if (isInRecoveryState()) {
setCompactionRevision(compactionRevision);
} else if (areWatchesStarted()) {
- watchProcessor.updateCompactionRevision(compactionRevision,
context.timestamp);
- } else {
+ if (compactionRevision > planedUpdateCompactionRevision) {
+ planedUpdateCompactionRevision = compactionRevision;
+
+
watchProcessor.updateCompactionRevision(compactionRevision, context.timestamp);
+ } else {
+ watchProcessor.advanceSafeTime(context.timestamp);
+ }
+ } else if (compactionRevision > planedUpdateCompactionRevision) {
+ planedUpdateCompactionRevision = compactionRevision;
+
var notifyWatchesEvent = new
UpdateCompactionRevisionEvent(compactionRevision, context.timestamp);
addToNotifyWatchProcessorEventsBeforeStartingWatches(notifyWatchesEvent);