This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new c04828abd1 IGNITE-23294 Handle Metastorage leader change case for 
Compaction Trigger (#4687)
c04828abd1 is described below

commit c04828abd1f4df2a778d8dd60cdd86bac4aacaa1
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Thu Nov 7 12:22:56 2024 +0300

    IGNITE-23294 Handle Metastorage leader change case for Compaction Trigger 
(#4687)
---
 .../CompactionRevisionUpdateListener.java          |  2 +-
 .../impl/ItMetaStorageCompactionTriggerTest.java   | 54 ++++++++++++++++++++++
 .../impl/MetaStorageCompactionTrigger.java         | 26 ++++++++++-
 .../server/AbstractKeyValueStorage.java            | 25 +++++++++-
 4 files changed, 103 insertions(+), 4 deletions(-)

diff --git 
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/CompactionRevisionUpdateListener.java
 
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/CompactionRevisionUpdateListener.java
index 57f15ad994..7f8b1575ec 100644
--- 
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/CompactionRevisionUpdateListener.java
+++ 
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/CompactionRevisionUpdateListener.java
@@ -21,7 +21,7 @@ package org.apache.ignite.internal.metastorage;
  * Listener which receives and handles the metastorage compaction revision 
update after watches have been started.
  *
  * <p>It is guaranteed that it will <b>not</b> be invoked in parallel with the 
handling of watch events or metastore revision update
- * events.</p>
+ * events, and it will also grow monotonously without duplicates.</p>
  */
 @FunctionalInterface
 public interface CompactionRevisionUpdateListener {
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageCompactionTriggerTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageCompactionTriggerTest.java
index 3b4e5860c5..e072eaad36 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageCompactionTriggerTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageCompactionTriggerTest.java
@@ -24,15 +24,21 @@ import static 
org.apache.ignite.internal.metastorage.TestMetasStorageUtils.VALUE
 import static 
org.apache.ignite.internal.metastorage.TestMetasStorageUtils.allNodesContainSingleRevisionForKeyLocally;
 import static 
org.apache.ignite.internal.metastorage.TestMetasStorageUtils.createClusterConfigWithCompactionProperties;
 import static 
org.apache.ignite.internal.metastorage.TestMetasStorageUtils.latestKeyRevision;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.hasSize;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
+import java.util.List;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CountDownLatch;
 import org.apache.ignite.InitParametersBuilder;
 import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
@@ -90,6 +96,54 @@ public class ItMetaStorageCompactionTriggerTest extends 
ClusterPerClassIntegrati
         );
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    void testCompactionRevisionMonotonouslyGrowsWithoutDuplicates(boolean 
changeMetastorageLeader) {
+        MetaStorageManagerImpl metaStorageManager = (MetaStorageManagerImpl) 
aliveNode().metaStorageManager();
+
+        long currentCompactionRevisionLocally = 
metaStorageManager.getCompactionRevisionLocally();
+
+        log.info("Current compaction revision before start: " + 
currentCompactionRevisionLocally);
+
+        var stopCollectCompactionRevisionFuture = new 
CompletableFuture<Void>();
+        var compactionRevisions = new ConcurrentLinkedQueue<Long>();
+
+        
metaStorageManager.storage().registerCompactionRevisionUpdateListener(compactionRevision
 -> {
+            compactionRevisions.add(compactionRevision);
+
+            if (compactionRevision >= currentCompactionRevisionLocally + 100) {
+                stopCollectCompactionRevisionFuture.complete(null);
+            }
+        });
+
+        CompletableFuture<Void> updateMetastorageForCompactionFuture = 
runAsync(() -> {
+            while (!stopCollectCompactionRevisionFuture.isDone()) {
+                assertThat(metaStorageManager.put(FOO_KEY, VALUE), 
willCompleteSuccessfully());
+            }
+        });
+
+        CompletableFuture<Void> changeMetastorageLeaderFuture = 
nullCompletedFuture();
+
+        if (changeMetastorageLeader) {
+            changeMetastorageLeaderFuture = runAsync(() -> {
+                while (!stopCollectCompactionRevisionFuture.isDone()) {
+                    transferMetastorageLeadershipToAnotherNode();
+
+                    Thread.sleep(50);
+                }
+            });
+        }
+
+        assertThat(stopCollectCompactionRevisionFuture, 
willCompleteSuccessfully());
+        assertThat(updateMetastorageForCompactionFuture, 
willCompleteSuccessfully());
+        assertThat(changeMetastorageLeaderFuture, willCompleteSuccessfully());
+
+        // Let's check that there are no duplicates.
+        List<Long> copyCompactionRevisions = List.copyOf(compactionRevisions);
+        assertThat(copyCompactionRevisions, hasSize(greaterThan(0)));
+        assertThat(Set.copyOf(copyCompactionRevisions), 
hasSize(copyCompactionRevisions.size()));
+    }
+
     private static IgniteImpl aliveNode() {
         return unwrapIgniteImpl(CLUSTER.aliveNode());
     }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageCompactionTrigger.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageCompactionTrigger.java
index 8731697a14..f33041ea3e 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageCompactionTrigger.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageCompactionTrigger.java
@@ -266,6 +266,30 @@ public class MetaStorageCompactionTrigger implements 
IgniteComponent {
         }
     }
 
+    /**
+     * It should be used precisely at the complete of the compaction, so as 
not to schedule a new update together with the event of
+     * electing a local node as a new leader.
+     */
+    private void scheduleNextCompactionIfNotScheduleBusy() {
+        lock.lock();
+
+        try {
+            if (started && isLocalNodeLeader) {
+                ScheduledFuture<?> lastScheduledFuture = 
this.lastScheduledFuture;
+
+                if (lastScheduledFuture == null || 
lastScheduledFuture.isDone()) {
+                    this.lastScheduledFuture = compactionExecutor.schedule(
+                            () -> inBusyLock(busyLock, this::doCompactionBusy),
+                            config.interval(),
+                            MILLISECONDS
+                    );
+                }
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
     /** Invoked when the metastorage compaction revision is updated. */
     private void onCompactionRevisionUpdate(long compactionRevision) {
         inBusyLockSafe(busyLock, () -> 
onCompactionRevisionUpdateBusy(compactionRevision));
@@ -290,7 +314,7 @@ public class MetaStorageCompactionTrigger implements 
IgniteComponent {
                         }
                     }
 
-                    inBusyLockSafe(busyLock, this::scheduleNextCompactionBusy);
+                    inBusyLockSafe(busyLock, 
this::scheduleNextCompactionIfNotScheduleBusy);
                 });
     }
 
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java
index 9a77c26a8c..23112ae386 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java
@@ -84,6 +84,19 @@ public abstract class AbstractKeyValueStorage implements 
KeyValueStorage {
      */
     protected long compactionRevision = -1;
 
+    /**
+     * Planned for update compaction revision to ensure monotony without 
duplicates when updating it.
+     *
+     * <p>This is necessary to avoid a situation when changing the leader, we 
get two requests to update the same compaction revision.
+     * Fixing the leader change problem is not at the protocol level since the 
update is performed asynchronously and in the background and
+     * we can get into a gap when commands came from different leaders to the 
same compaction revision, but we simply did not have time to
+     * process the update of the compaction revision from the previous leader. 
This is necessary to cover corner cases with a sufficiently
+     * small compaction revision update interval.</p>
+     *
+     * <p>Multi-threaded access is guarded by {@link #rwLock}.</p>
+     */
+    private volatile long planedUpdateCompactionRevision = -1;
+
     protected final AtomicBoolean stopCompaction = new AtomicBoolean();
 
     /** Tracks only cursors, since reading a single entry or a batch is done 
entirely under {@link #rwLock}. */
@@ -265,8 +278,16 @@ public abstract class AbstractKeyValueStorage implements 
KeyValueStorage {
             if (isInRecoveryState()) {
                 setCompactionRevision(compactionRevision);
             } else if (areWatchesStarted()) {
-                watchProcessor.updateCompactionRevision(compactionRevision, 
context.timestamp);
-            } else {
+                if (compactionRevision > planedUpdateCompactionRevision) {
+                    planedUpdateCompactionRevision = compactionRevision;
+
+                    
watchProcessor.updateCompactionRevision(compactionRevision, context.timestamp);
+                } else {
+                    watchProcessor.advanceSafeTime(context.timestamp);
+                }
+            } else if (compactionRevision > planedUpdateCompactionRevision) {
+                planedUpdateCompactionRevision = compactionRevision;
+
                 var notifyWatchesEvent = new 
UpdateCompactionRevisionEvent(compactionRevision, context.timestamp);
 
                 
addToNotifyWatchProcessorEventsBeforeStartingWatches(notifyWatchesEvent);

Reply via email to