This is an automated email from the ASF dual-hosted git repository. samt pushed a commit to branch cep-21-tcm in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 8fd47e605f41a353efd8696ccecf46a7947a9c65 Author: Alex Petrov <[email protected]> AuthorDate: Tue Mar 7 19:08:52 2023 +0100 [CEP-21] Implement retries for log replay on CMS members patch by Alex Petrov; reviewed by Marcus Eriksson and Sam Tunnicliffe for CASSANDRA-18408 --- .../apache/cassandra/net/ResponseVerbHandler.java | 20 ++++++++++----- .../cassandra/tcm/AbstractLocalProcessor.java | 30 +++++++++++++++++++--- .../cassandra/tcm/AtomicLongBackedProcessor.java | 2 +- .../apache/cassandra/tcm/PaxosBackedProcessor.java | 5 ++-- .../distributed/test/log/FailedLeaveTest.java | 2 +- 5 files changed, 44 insertions(+), 15 deletions(-) diff --git a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java index 6ae760e491..40b56d988b 100644 --- a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java +++ b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java @@ -17,11 +17,15 @@ */ package org.apache.cassandra.net; +import java.util.EnumSet; +import java.util.Set; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.exceptions.RequestFailureReason; +import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.ClusterMetadataService; import org.apache.cassandra.tracing.Tracing; @@ -33,17 +37,19 @@ class ResponseVerbHandler implements IVerbHandler public static final ResponseVerbHandler instance = new ResponseVerbHandler(); private static final Logger logger = LoggerFactory.getLogger(ResponseVerbHandler.class); + private static final Set<Verb> SKIP_CATCHUP_FOR = EnumSet.of(Verb.TCM_REPLAY_RSP, + Verb.TCM_COMMIT_RSP, + Verb.TCM_REPLICATION, + Verb.TCM_NOTIFY_RSP, + Verb.TCM_DISCOVER_RSP, + Verb.TCM_INIT_MIG_RSP); @Override public void doVerb(Message message) { - if (message.verb() != Verb.TCM_REPLAY_RSP && - message.verb() != Verb.TCM_COMMIT_RSP && - message.verb() != Verb.TCM_REPLICATION && - message.verb() != Verb.TCM_NOTIFY_RSP && - message.verb() != Verb.TCM_DISCOVER_RSP && - message.verb() != Verb.TCM_INIT_MIG_RSP && - message.verb() != Verb.TCM_INIT_MIG_RSP && + + if (message.epoch().isAfter(ClusterMetadata.current().epoch) && + !SKIP_CATCHUP_FOR.contains(message.verb()) && // Gossip stage is single-threaded, so we may end up in a deadlock with after-commit hook // that executes something on the gossip stage as well. !Stage.GOSSIP.executor().inExecutor()) diff --git a/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java b/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java index 07950ce6e9..d657e9bd90 100644 --- a/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java +++ b/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java @@ -40,7 +40,7 @@ public abstract class AbstractLocalProcessor implements Processor } @Override - public Commit.Result commit(Entry.Id entryId, Transformation transform, final Epoch lastKnown) + public final Commit.Result commit(Entry.Id entryId, Transformation transform, final Epoch lastKnown) { Transformation.Result result; @@ -92,7 +92,8 @@ public abstract class AbstractLocalProcessor implements Processor // if we're rejected, just try to catch up to the latest distributed state if (result.isRejected()) { - Epoch replayed = replayAndWait().epoch; + Epoch replayed = replayAndWait(jitter).epoch; + // Retry if replay has changed the epoch, return rejection otherwise. if (!replayed.isAfter(previous.epoch)) return result.rejected(); @@ -122,12 +123,35 @@ public abstract class AbstractLocalProcessor implements Processor else { // It may happen that we have raced with a different processor, in which case we need to catch up and retry. - replayAndWait(); + replayAndWait(jitter); jitter.maybeSleep(); } } } + @Override + public final ClusterMetadata replayAndWait() + { + return replayAndWait(new Retry.Jitter()); + } + + protected final ClusterMetadata replayAndWait(Retry retry) + { + while (true) + { + try + { + return tryReplayAndWait(); + } + catch (Throwable t) + { + if (retry.reachedMax()) + throw new IllegalStateException(String.format("Could not succeed with replay after %s tries.", retry.currentTries())); + } + } + } + + protected abstract ClusterMetadata tryReplayAndWait(); protected abstract boolean tryCommitOne(Entry.Id entryId, Transformation transform, Epoch previousEpoch, Epoch nextEpoch, long previousPeriod, long nextPeriod, boolean sealPeriod); diff --git a/src/java/org/apache/cassandra/tcm/AtomicLongBackedProcessor.java b/src/java/org/apache/cassandra/tcm/AtomicLongBackedProcessor.java index 975d581b6c..b85cd5310f 100644 --- a/src/java/org/apache/cassandra/tcm/AtomicLongBackedProcessor.java +++ b/src/java/org/apache/cassandra/tcm/AtomicLongBackedProcessor.java @@ -68,7 +68,7 @@ public class AtomicLongBackedProcessor extends AbstractLocalProcessor } @Override - public ClusterMetadata replayAndWait() + protected ClusterMetadata tryReplayAndWait() { return log.waitForHighestConsecutive(); } diff --git a/src/java/org/apache/cassandra/tcm/PaxosBackedProcessor.java b/src/java/org/apache/cassandra/tcm/PaxosBackedProcessor.java index 2ab2f921e9..f65804b1da 100644 --- a/src/java/org/apache/cassandra/tcm/PaxosBackedProcessor.java +++ b/src/java/org/apache/cassandra/tcm/PaxosBackedProcessor.java @@ -55,10 +55,9 @@ public class PaxosBackedProcessor extends AbstractLocalProcessor } @Override - public ClusterMetadata replayAndWait() + protected ClusterMetadata tryReplayAndWait() { - log.waitForHighestConsecutive(); - ClusterMetadata metadata = log.metadata(); + ClusterMetadata metadata = log.waitForHighestConsecutive(); Set<Replica> replicas = metadata.fullCMSMembersAsReplicas(); diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/FailedLeaveTest.java b/test/distributed/org/apache/cassandra/distributed/test/log/FailedLeaveTest.java index 739ec3db35..b7509ff40f 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/log/FailedLeaveTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/log/FailedLeaveTest.java @@ -122,7 +122,7 @@ public class FailedLeaveTest extends FuzzTestBase Epoch currentEpoch = getClusterMetadataVersion(cmsInstance); Assert.assertEquals(startEpoch.getEpoch() + 2, currentEpoch.getEpoch()); - // Node 2's leaving failed due to the streaming errors. If decommission is called again on the it should + // Node 2's leaving failed due to the streaming errors. If decommission is called again on the node, it should // resume where it left off. Allow streaming to succeed this time and verify that the node is able to // finish leaving. cluster.get(3).runOnInstance(() -> BB.failReceivingStream.set(false)); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
