This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 3c4dfbe561 IGNITE-23206 Update term used for Metastorage idle SafeTime 
propagation on leadership refresh (#4393)
3c4dfbe561 is described below

commit 3c4dfbe561431c80f8e0abd0e8fc5b2ea9a2daf8
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Fri Sep 13 17:40:27 2024 +0400

    IGNITE-23206 Update term used for Metastorage idle SafeTime propagation on 
leadership refresh (#4393)
    
     * If an elected leader is same as before, still update term used for 
Metastorage Idle SafeTime propagation
     * Fix a race condition in test which was giving false positives. That is, 
we now make sure that 'old' SafeTime advancements (made before pausing 
secondary duties) are waited out before checking that no new advancements happen
     * Improve logging
---
 .../internal/future/timeout/TimeoutWorker.java     |  2 +-
 .../impl/ItMetaStorageMaintenanceTest.java         | 27 ++++++-
 .../impl/MetaStorageLeaderElectionListener.java    | 86 ++++++++++++----------
 .../server/raft/MetaStorageWriteHandler.java       |  4 +
 .../ignite/internal/raft/RaftGroupServiceImpl.java |  3 +-
 5 files changed, 79 insertions(+), 43 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/future/timeout/TimeoutWorker.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/future/timeout/TimeoutWorker.java
index ed7f07973c..3dcc3d75a5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/future/timeout/TimeoutWorker.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/future/timeout/TimeoutWorker.java
@@ -105,7 +105,7 @@ public class TimeoutWorker extends IgniteWorker {
                 try {
                     Thread.sleep(sleepInterval);
                 } catch (InterruptedException e) {
-                    log.info("The timeout worker was interrupted, probably the 
client is stopping.");
+                    log.info("The timeout worker was interrupted, probably the 
worker is stopping.");
                 }
 
                 updateHeartbeat();
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMaintenanceTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMaintenanceTest.java
index c331ef51f1..f7acce025e 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMaintenanceTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMaintenanceTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.metastorage.impl;
 
 import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.TimeUnit.SECONDS;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.getFieldValue;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutIn;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
@@ -33,7 +34,10 @@ import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExten
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.ByteArray;
 import org.apache.ignite.internal.lang.NodeStoppingException;
+import 
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.WatchProcessor;
 import org.apache.ignite.internal.metastorage.server.time.ClusterTime;
+import org.apache.ignite.internal.util.ArrayUtils;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 
@@ -158,15 +162,32 @@ class ItMetaStorageMaintenanceTest extends 
ItMetaStorageMultipleNodesAbstractTes
         assertThat(node0.metaStorageManager.becomeLonelyLeader(true), 
willCompleteSuccessfully());
 
         ClusterTime clusterTime0 = node0.metaStorageManager.clusterTime();
-        HybridTimestamp timeBeforeOp = clusterTime0.currentSafeTime();
+
+        causeSafeTimeCommandsIssuedBeforePausingToBeApplied(node0);
 
         // Make sure the leader does not propagate Metastorage SafeTime (as we 
requested it to pause secondary duties).
+        HybridTimestamp timeAtStart = clusterTime0.currentSafeTime();
         assertFalse(
-                waitForCondition(() -> 
clusterTime0.currentSafeTime().longValue() > timeBeforeOp.longValue(), 
SECONDS.toMillis(2)),
-                "The leader still propagates safetime"
+                waitForCondition(() -> 
clusterTime0.currentSafeTime().longValue() > timeAtStart.longValue(), 
SECONDS.toMillis(2)),
+                () -> "The leader still propagates safetime " + 
clusterTime0.currentSafeTime()
         );
     }
 
+    private static void 
causeSafeTimeCommandsIssuedBeforePausingToBeApplied(Node node) {
+        // We execute a PUT command and then wait for SafeTime to be advanced. 
This guarantees that idle SafeTime propagation
+        // commands before pausing idle SafeTime propagation get executed and 
we don't get a non-relevant test failure.
+        assertThat(node.metaStorageManager.put(new ByteArray("abc"), 
ArrayUtils.BYTE_EMPTY_ARRAY), willCompleteSuccessfully());
+
+        // TODO: IGNITE-15723 After a component factory is implemented, need 
to get rid of reflection here.
+        var storage = (SimpleInMemoryKeyValueStorage) 
getFieldValue(node.metaStorageManager, MetaStorageManagerImpl.class, "storage");
+        var watchProcessor = (WatchProcessor) getFieldValue(storage, 
SimpleInMemoryKeyValueStorage.class, "watchProcessor");
+
+        CompletableFuture<Void> notificationFuture = 
getFieldValue(watchProcessor, WatchProcessor.class, "notificationFuture");
+        if (notificationFuture != null) {
+            assertThat(notificationFuture, willCompleteSuccessfully());
+        }
+    }
+
     @Test
     void becomeLonelyLeaderKeepsIdleSafeTimePropagationIfPauseNotRequested() 
throws Exception {
         enableIdleSafeTimeSync();
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageLeaderElectionListener.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageLeaderElectionListener.java
index b548e4bfa5..39819f7d81 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageLeaderElectionListener.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageLeaderElectionListener.java
@@ -122,53 +122,61 @@ public class MetaStorageLeaderElectionListener implements 
LeaderElectionListener
     public void onLeaderElected(ClusterNode node, long term) {
         electionListeners.forEach(listener -> listener.onLeaderElected(node));
 
-        synchronized (serializationFutureMux) {
-            if (node.name().equals(nodeName)) {
-                // We are the new leader. This does not necessarily mean we 
weren't previous leader (one node might
-                // be a leader 2 times in a row at least as a result of 
Metastorage repair).
-                LOG.info("Node has been elected as the leader, starting 
secondary duties");
-
-                thisNodeTerm = term;
-
-                if (serializationFuture == null) {
-                    // The node was not previous leader, and it becomes a 
leader.
-                    
logicalTopologyService.addEventListener(logicalTopologyEventListener);
+        boolean weAreNewLeader = node.name().equals(nodeName);
 
-                    LOG.info("Starting Idle Safe Time scheduler");
-
-                    metaStorageSvcFut
-                            .thenAcceptBoth(metaStorageConfigurationFuture, 
(service, metaStorageConfiguration) -> {
-                                clusterTime.startSafeTimeScheduler(
-                                        safeTime -> 
syncTimeIfSecondaryDutiesAreNotPaused(safeTime, term, service),
-                                        metaStorageConfiguration
-                                );
-                            })
-                            .whenComplete((v, e) -> {
-                                if (e != null) {
-                                    LOG.error("Unable to start Idle Safe Time 
scheduler", e);
-                                }
-                            });
-                }
+        synchronized (serializationFutureMux) {
+            boolean weWerePreviousLeader = serializationFuture != null;
 
-                // Update learner configuration (in case we missed some 
topology updates between elections).
-                serializationFuture = (serializationFuture == null ? 
nullCompletedFuture() : serializationFuture)
-                        .thenCompose(unused -> 
updateLearnersIfSecondaryDutiesAreNotPaused(term));
-            } else if (serializationFuture != null) {
-                LOG.info("Node has lost the leadership, stopping Idle Safe 
Time scheduler");
+            if (weWerePreviousLeader && !weAreNewLeader) {
+                LOG.info("Node has lost the leadership, stopping doing 
secondary duties");
 
                 thisNodeTerm = null;
 
-                
logicalTopologyService.removeEventListener(logicalTopologyEventListener);
-
                 clusterTime.stopSafeTimeScheduler();
 
+                
logicalTopologyService.removeEventListener(logicalTopologyEventListener);
+
                 serializationFuture.cancel(false);
 
                 serializationFuture = null;
             }
+
+            if (weAreNewLeader) {
+                thisNodeTerm = term;
+
+                if (!weWerePreviousLeader) {
+                    LOG.info("Node has been elected as the leader (and it 
wasn't previous leader), so starting doing secondary duties");
+
+                    startSafeTimeScheduler();
+
+                    // The node was not previous leader, and it becomes a 
leader.
+                    
logicalTopologyService.addEventListener(logicalTopologyEventListener);
+
+                    // Update learner configuration (in case we missed some 
topology updates between elections).
+                    serializationFuture = (serializationFuture == null ? 
nullCompletedFuture() : serializationFuture)
+                            .thenCompose(unused -> 
updateLearnersIfSecondaryDutiesAreNotPaused(term));
+                } else {
+                    LOG.info("Node has been reelected as the leader");
+                }
+            }
         }
     }
 
+    private void startSafeTimeScheduler() {
+        metaStorageSvcFut
+                .thenAcceptBoth(metaStorageConfigurationFuture, (service, 
metaStorageConfiguration) -> {
+                    clusterTime.startSafeTimeScheduler(
+                            safeTime -> 
syncTimeIfSecondaryDutiesAreNotPaused(safeTime, service),
+                            metaStorageConfiguration
+                    );
+                })
+                .whenComplete((v, e) -> {
+                    if (e != null) {
+                        LOG.error("Unable to start Idle Safe Time scheduler", 
e);
+                    }
+                });
+    }
+
     private CompletableFuture<Void> 
updateLearnersIfSecondaryDutiesAreNotPaused(long term) {
         if (leaderSecondaryDutiesPaused.getAsBoolean()) {
             return nullCompletedFuture();
@@ -177,15 +185,17 @@ public class MetaStorageLeaderElectionListener implements 
LeaderElectionListener
         return metaStorageSvcFut.thenCompose(service -> 
resetLearners(service.raftGroupService(), term));
     }
 
-    private CompletableFuture<Void> syncTimeIfSecondaryDutiesAreNotPaused(
-            HybridTimestamp safeTime,
-            long term,
-            MetaStorageServiceImpl service
-    ) {
+    private CompletableFuture<Void> 
syncTimeIfSecondaryDutiesAreNotPaused(HybridTimestamp safeTime, 
MetaStorageServiceImpl service) {
         if (leaderSecondaryDutiesPaused.getAsBoolean()) {
             return nullCompletedFuture();
         }
 
+        Long term = thisNodeTerm;
+        if (term == null) {
+            // We seized to be a leader, do nothing.
+            return nullCompletedFuture();
+        }
+
         return service.syncTime(safeTime, term);
     }
 
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
index f6fd3b827e..93e1c7f27e 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
@@ -136,6 +136,10 @@ public class MetaStorageWriteHandler {
 
                     // Ignore the command if it has been sent by a stale 
leader.
                     if (clo.term() != syncTimeCommand.initiatorTerm()) {
+                        LOG.info("Sync time command closure term {}, initiator 
term {}, ignoring the command",
+                                clo.term(), syncTimeCommand.initiatorTerm()
+                        );
+
                         clo.result(null);
 
                         return;
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
index fc7526bc83..54b7517ec3 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
@@ -598,7 +598,8 @@ public class RaftGroupServiceImpl implements 
RaftGroupService {
                     .thenCompose(node -> 
cluster.messagingService().invoke(node, request, 
configuration.responseTimeout().value()))
                     .whenComplete((resp, err) -> {
                         if (LOG.isTraceEnabled()) {
-                            LOG.trace("sendWithRetry resp={} from={} to={} 
err={}",
+                            LOG.trace("sendWithRetry req={} resp={} from={} 
to={} err={}",
+                                    request,
                                     resp,
                                     
cluster.topologyService().localMember().address(),
                                     peer.consistentId(),

Reply via email to