This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch cep-21-tcm
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 8fd47e605f41a353efd8696ccecf46a7947a9c65
Author: Alex Petrov <[email protected]>
AuthorDate: Tue Mar 7 19:08:52 2023 +0100

    [CEP-21] Implement retries for log replay on CMS members
    
    patch by Alex Petrov; reviewed by Marcus Eriksson and Sam Tunnicliffe
    for CASSANDRA-18408
---
 .../apache/cassandra/net/ResponseVerbHandler.java  | 20 ++++++++++-----
 .../cassandra/tcm/AbstractLocalProcessor.java      | 30 +++++++++++++++++++---
 .../cassandra/tcm/AtomicLongBackedProcessor.java   |  2 +-
 .../apache/cassandra/tcm/PaxosBackedProcessor.java |  5 ++--
 .../distributed/test/log/FailedLeaveTest.java      |  2 +-
 5 files changed, 44 insertions(+), 15 deletions(-)

diff --git a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java 
b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
index 6ae760e491..40b56d988b 100644
--- a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
+++ b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
@@ -17,11 +17,15 @@
  */
 package org.apache.cassandra.net;
 
+import java.util.EnumSet;
+import java.util.Set;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.exceptions.RequestFailureReason;
+import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.ClusterMetadataService;
 import org.apache.cassandra.tracing.Tracing;
 
@@ -33,17 +37,19 @@ class ResponseVerbHandler implements IVerbHandler
     public static final ResponseVerbHandler instance = new 
ResponseVerbHandler();
 
     private static final Logger logger = 
LoggerFactory.getLogger(ResponseVerbHandler.class);
+    private static final Set<Verb> SKIP_CATCHUP_FOR = 
EnumSet.of(Verb.TCM_REPLAY_RSP,
+                                                                 
Verb.TCM_COMMIT_RSP,
+                                                                 
Verb.TCM_REPLICATION,
+                                                                 
Verb.TCM_NOTIFY_RSP,
+                                                                 
Verb.TCM_DISCOVER_RSP,
+                                                                 
Verb.TCM_INIT_MIG_RSP);
 
     @Override
     public void doVerb(Message message)
     {
-        if (message.verb() != Verb.TCM_REPLAY_RSP &&
-            message.verb() != Verb.TCM_COMMIT_RSP &&
-            message.verb() != Verb.TCM_REPLICATION &&
-            message.verb() != Verb.TCM_NOTIFY_RSP &&
-            message.verb() != Verb.TCM_DISCOVER_RSP &&
-            message.verb() != Verb.TCM_INIT_MIG_RSP &&
-            message.verb() != Verb.TCM_INIT_MIG_RSP &&
+
+        if (message.epoch().isAfter(ClusterMetadata.current().epoch) &&
+            !SKIP_CATCHUP_FOR.contains(message.verb()) &&
             // Gossip stage is single-threaded, so we may end up in a deadlock 
with after-commit hook
             // that executes something on the gossip stage as well.
             !Stage.GOSSIP.executor().inExecutor())
diff --git a/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java 
b/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java
index 07950ce6e9..d657e9bd90 100644
--- a/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java
+++ b/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java
@@ -40,7 +40,7 @@ public abstract class AbstractLocalProcessor implements 
Processor
     }
 
     @Override
-    public Commit.Result commit(Entry.Id entryId, Transformation transform, 
final Epoch lastKnown)
+    public final Commit.Result commit(Entry.Id entryId, Transformation 
transform, final Epoch lastKnown)
     {
         Transformation.Result result;
 
@@ -92,7 +92,8 @@ public abstract class AbstractLocalProcessor implements 
Processor
             // if we're rejected, just try to catch up to the latest 
distributed state
             if (result.isRejected())
             {
-                Epoch replayed = replayAndWait().epoch;
+                Epoch replayed = replayAndWait(jitter).epoch;
+
                 // Retry if replay has changed the epoch, return rejection 
otherwise.
                 if (!replayed.isAfter(previous.epoch))
                     return result.rejected();
@@ -122,12 +123,35 @@ public abstract class AbstractLocalProcessor implements 
Processor
             else
             {
                 // It may happen that we have raced with a different 
processor, in which case we need to catch up and retry.
-                replayAndWait();
+                replayAndWait(jitter);
                 jitter.maybeSleep();
             }
         }
     }
 
+    @Override
+    public final ClusterMetadata replayAndWait()
+    {
+        return replayAndWait(new Retry.Jitter());
+    }
+
+    protected final ClusterMetadata replayAndWait(Retry retry)
+    {
+        while (true)
+        {
+            try
+            {
+                return tryReplayAndWait();
+            }
+            catch (Throwable t)
+            {
+                if (retry.reachedMax())
+                    throw new IllegalStateException(String.format("Could not 
succeed with replay after %s tries.", retry.currentTries()));
+            }
+        }
+    }
+
+    protected abstract ClusterMetadata tryReplayAndWait();
     protected abstract boolean tryCommitOne(Entry.Id entryId, Transformation 
transform,
                                             Epoch previousEpoch, Epoch 
nextEpoch,
                                             long previousPeriod, long 
nextPeriod, boolean sealPeriod);
diff --git a/src/java/org/apache/cassandra/tcm/AtomicLongBackedProcessor.java 
b/src/java/org/apache/cassandra/tcm/AtomicLongBackedProcessor.java
index 975d581b6c..b85cd5310f 100644
--- a/src/java/org/apache/cassandra/tcm/AtomicLongBackedProcessor.java
+++ b/src/java/org/apache/cassandra/tcm/AtomicLongBackedProcessor.java
@@ -68,7 +68,7 @@ public class AtomicLongBackedProcessor extends 
AbstractLocalProcessor
     }
 
     @Override
-    public ClusterMetadata replayAndWait()
+    protected ClusterMetadata tryReplayAndWait()
     {
         return log.waitForHighestConsecutive();
     }
diff --git a/src/java/org/apache/cassandra/tcm/PaxosBackedProcessor.java 
b/src/java/org/apache/cassandra/tcm/PaxosBackedProcessor.java
index 2ab2f921e9..f65804b1da 100644
--- a/src/java/org/apache/cassandra/tcm/PaxosBackedProcessor.java
+++ b/src/java/org/apache/cassandra/tcm/PaxosBackedProcessor.java
@@ -55,10 +55,9 @@ public class PaxosBackedProcessor extends 
AbstractLocalProcessor
     }
 
     @Override
-    public ClusterMetadata replayAndWait()
+    protected ClusterMetadata tryReplayAndWait()
     {
-        log.waitForHighestConsecutive();
-        ClusterMetadata metadata = log.metadata();
+        ClusterMetadata metadata = log.waitForHighestConsecutive();
 
         Set<Replica> replicas = metadata.fullCMSMembersAsReplicas();
 
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/log/FailedLeaveTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/log/FailedLeaveTest.java
index 739ec3db35..b7509ff40f 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/log/FailedLeaveTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/log/FailedLeaveTest.java
@@ -122,7 +122,7 @@ public class FailedLeaveTest extends FuzzTestBase
             Epoch currentEpoch = getClusterMetadataVersion(cmsInstance);
             Assert.assertEquals(startEpoch.getEpoch() + 2, 
currentEpoch.getEpoch());
 
-            // Node 2's leaving failed due to the streaming errors. If 
decommission is called again on the it should
+            // Node 2's leaving failed due to the streaming errors. If 
decommission is called again on the node, it should
             // resume where it left off. Allow streaming to succeed this time 
and verify that the node is able to
             // finish leaving.
             cluster.get(3).runOnInstance(() -> 
BB.failReceivingStream.set(false));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to