This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 3c4dfbe561 IGNITE-23206 Update term used for Metastorage idle SafeTime
propagation on leadership refresh (#4393)
3c4dfbe561 is described below
commit 3c4dfbe561431c80f8e0abd0e8fc5b2ea9a2daf8
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Fri Sep 13 17:40:27 2024 +0400
IGNITE-23206 Update term used for Metastorage idle SafeTime propagation on
leadership refresh (#4393)
* If an elected leader is same as before, still update term used for
Metastorage Idle SafeTime propagation
* Fix a race condition in test which was giving false positives. That is,
we now make sure that 'old' SafeTime advancements (made before pausing
secondary duties) are waited out before checking that no new advancements happen
* Improve logging
---
.../internal/future/timeout/TimeoutWorker.java | 2 +-
.../impl/ItMetaStorageMaintenanceTest.java | 27 ++++++-
.../impl/MetaStorageLeaderElectionListener.java | 86 ++++++++++++----------
.../server/raft/MetaStorageWriteHandler.java | 4 +
.../ignite/internal/raft/RaftGroupServiceImpl.java | 3 +-
5 files changed, 79 insertions(+), 43 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/future/timeout/TimeoutWorker.java
b/modules/core/src/main/java/org/apache/ignite/internal/future/timeout/TimeoutWorker.java
index ed7f07973c..3dcc3d75a5 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/future/timeout/TimeoutWorker.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/future/timeout/TimeoutWorker.java
@@ -105,7 +105,7 @@ public class TimeoutWorker extends IgniteWorker {
try {
Thread.sleep(sleepInterval);
} catch (InterruptedException e) {
- log.info("The timeout worker was interrupted, probably the
client is stopping.");
+ log.info("The timeout worker was interrupted, probably the
worker is stopping.");
}
updateHeartbeat();
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMaintenanceTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMaintenanceTest.java
index c331ef51f1..f7acce025e 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMaintenanceTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMaintenanceTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.metastorage.impl;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.TimeUnit.SECONDS;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.getFieldValue;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutIn;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
@@ -33,7 +34,10 @@ import
org.apache.ignite.internal.configuration.testframework.ConfigurationExten
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.lang.NodeStoppingException;
+import
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.WatchProcessor;
import org.apache.ignite.internal.metastorage.server.time.ClusterTime;
+import org.apache.ignite.internal.util.ArrayUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -158,15 +162,32 @@ class ItMetaStorageMaintenanceTest extends
ItMetaStorageMultipleNodesAbstractTes
assertThat(node0.metaStorageManager.becomeLonelyLeader(true),
willCompleteSuccessfully());
ClusterTime clusterTime0 = node0.metaStorageManager.clusterTime();
- HybridTimestamp timeBeforeOp = clusterTime0.currentSafeTime();
+
+ causeSafeTimeCommandsIssuedBeforePausingToBeApplied(node0);
// Make sure the leader does not propagate Metastorage SafeTime (as we
requested it to pause secondary duties).
+ HybridTimestamp timeAtStart = clusterTime0.currentSafeTime();
assertFalse(
- waitForCondition(() ->
clusterTime0.currentSafeTime().longValue() > timeBeforeOp.longValue(),
SECONDS.toMillis(2)),
- "The leader still propagates safetime"
+ waitForCondition(() ->
clusterTime0.currentSafeTime().longValue() > timeAtStart.longValue(),
SECONDS.toMillis(2)),
+ () -> "The leader still propagates safetime " +
clusterTime0.currentSafeTime()
);
}
+ private static void
causeSafeTimeCommandsIssuedBeforePausingToBeApplied(Node node) {
+ // We execute a PUT command and then wait for SafeTime to be advanced.
This guarantees that idle SafeTime propagation
+ // commands before pausing idle SafeTime propagation get executed and
we don't get a non-relevant test failure.
+ assertThat(node.metaStorageManager.put(new ByteArray("abc"),
ArrayUtils.BYTE_EMPTY_ARRAY), willCompleteSuccessfully());
+
+ // TODO: IGNITE-15723 After a component factory is implemented, need
to get rid of reflection here.
+ var storage = (SimpleInMemoryKeyValueStorage)
getFieldValue(node.metaStorageManager, MetaStorageManagerImpl.class, "storage");
+ var watchProcessor = (WatchProcessor) getFieldValue(storage,
SimpleInMemoryKeyValueStorage.class, "watchProcessor");
+
+ CompletableFuture<Void> notificationFuture =
getFieldValue(watchProcessor, WatchProcessor.class, "notificationFuture");
+ if (notificationFuture != null) {
+ assertThat(notificationFuture, willCompleteSuccessfully());
+ }
+ }
+
@Test
void becomeLonelyLeaderKeepsIdleSafeTimePropagationIfPauseNotRequested()
throws Exception {
enableIdleSafeTimeSync();
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageLeaderElectionListener.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageLeaderElectionListener.java
index b548e4bfa5..39819f7d81 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageLeaderElectionListener.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageLeaderElectionListener.java
@@ -122,53 +122,61 @@ public class MetaStorageLeaderElectionListener implements
LeaderElectionListener
public void onLeaderElected(ClusterNode node, long term) {
electionListeners.forEach(listener -> listener.onLeaderElected(node));
- synchronized (serializationFutureMux) {
- if (node.name().equals(nodeName)) {
- // We are the new leader. This does not necessarily mean we
weren't previous leader (one node might
- // be a leader 2 times in a row at least as a result of
Metastorage repair).
- LOG.info("Node has been elected as the leader, starting
secondary duties");
-
- thisNodeTerm = term;
-
- if (serializationFuture == null) {
- // The node was not previous leader, and it becomes a
leader.
-
logicalTopologyService.addEventListener(logicalTopologyEventListener);
+ boolean weAreNewLeader = node.name().equals(nodeName);
- LOG.info("Starting Idle Safe Time scheduler");
-
- metaStorageSvcFut
- .thenAcceptBoth(metaStorageConfigurationFuture,
(service, metaStorageConfiguration) -> {
- clusterTime.startSafeTimeScheduler(
- safeTime ->
syncTimeIfSecondaryDutiesAreNotPaused(safeTime, term, service),
- metaStorageConfiguration
- );
- })
- .whenComplete((v, e) -> {
- if (e != null) {
- LOG.error("Unable to start Idle Safe Time
scheduler", e);
- }
- });
- }
+ synchronized (serializationFutureMux) {
+ boolean weWerePreviousLeader = serializationFuture != null;
- // Update learner configuration (in case we missed some
topology updates between elections).
- serializationFuture = (serializationFuture == null ?
nullCompletedFuture() : serializationFuture)
- .thenCompose(unused ->
updateLearnersIfSecondaryDutiesAreNotPaused(term));
- } else if (serializationFuture != null) {
- LOG.info("Node has lost the leadership, stopping Idle Safe
Time scheduler");
+ if (weWerePreviousLeader && !weAreNewLeader) {
+ LOG.info("Node has lost the leadership, stopping doing
secondary duties");
thisNodeTerm = null;
-
logicalTopologyService.removeEventListener(logicalTopologyEventListener);
-
clusterTime.stopSafeTimeScheduler();
+
logicalTopologyService.removeEventListener(logicalTopologyEventListener);
+
serializationFuture.cancel(false);
serializationFuture = null;
}
+
+ if (weAreNewLeader) {
+ thisNodeTerm = term;
+
+ if (!weWerePreviousLeader) {
+ LOG.info("Node has been elected as the leader (and it
wasn't previous leader), so starting doing secondary duties");
+
+ startSafeTimeScheduler();
+
+ // The node was not previous leader, and it becomes a
leader.
+
logicalTopologyService.addEventListener(logicalTopologyEventListener);
+
+ // Update learner configuration (in case we missed some
topology updates between elections).
+ serializationFuture = (serializationFuture == null ?
nullCompletedFuture() : serializationFuture)
+ .thenCompose(unused ->
updateLearnersIfSecondaryDutiesAreNotPaused(term));
+ } else {
+ LOG.info("Node has been reelected as the leader");
+ }
+ }
}
}
+ private void startSafeTimeScheduler() {
+ metaStorageSvcFut
+ .thenAcceptBoth(metaStorageConfigurationFuture, (service,
metaStorageConfiguration) -> {
+ clusterTime.startSafeTimeScheduler(
+ safeTime ->
syncTimeIfSecondaryDutiesAreNotPaused(safeTime, service),
+ metaStorageConfiguration
+ );
+ })
+ .whenComplete((v, e) -> {
+ if (e != null) {
+ LOG.error("Unable to start Idle Safe Time scheduler",
e);
+ }
+ });
+ }
+
private CompletableFuture<Void>
updateLearnersIfSecondaryDutiesAreNotPaused(long term) {
if (leaderSecondaryDutiesPaused.getAsBoolean()) {
return nullCompletedFuture();
@@ -177,15 +185,17 @@ public class MetaStorageLeaderElectionListener implements
LeaderElectionListener
return metaStorageSvcFut.thenCompose(service ->
resetLearners(service.raftGroupService(), term));
}
- private CompletableFuture<Void> syncTimeIfSecondaryDutiesAreNotPaused(
- HybridTimestamp safeTime,
- long term,
- MetaStorageServiceImpl service
- ) {
+ private CompletableFuture<Void>
syncTimeIfSecondaryDutiesAreNotPaused(HybridTimestamp safeTime,
MetaStorageServiceImpl service) {
if (leaderSecondaryDutiesPaused.getAsBoolean()) {
return nullCompletedFuture();
}
+ Long term = thisNodeTerm;
+ if (term == null) {
+ // We seized to be a leader, do nothing.
+ return nullCompletedFuture();
+ }
+
return service.syncTime(safeTime, term);
}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
index f6fd3b827e..93e1c7f27e 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
@@ -136,6 +136,10 @@ public class MetaStorageWriteHandler {
// Ignore the command if it has been sent by a stale
leader.
if (clo.term() != syncTimeCommand.initiatorTerm()) {
+ LOG.info("Sync time command closure term {}, initiator
term {}, ignoring the command",
+ clo.term(), syncTimeCommand.initiatorTerm()
+ );
+
clo.result(null);
return;
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
index fc7526bc83..54b7517ec3 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
@@ -598,7 +598,8 @@ public class RaftGroupServiceImpl implements
RaftGroupService {
.thenCompose(node ->
cluster.messagingService().invoke(node, request,
configuration.responseTimeout().value()))
.whenComplete((resp, err) -> {
if (LOG.isTraceEnabled()) {
- LOG.trace("sendWithRetry resp={} from={} to={}
err={}",
+ LOG.trace("sendWithRetry req={} resp={} from={}
to={} err={}",
+ request,
resp,
cluster.topologyService().localMember().address(),
peer.consistentId(),