This is an automated email from the ASF dual-hosted git repository.

asf-gitbox-commits pushed a commit to branch cassandra-6.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit af5eed2a3277b747a1a4a2b7bb18dce39fb167d5
Author: Marcus Eriksson <[email protected]>
AuthorDate: Mon Mar 16 15:04:41 2026 +0100

    Handle lost response when committing PrepareMove
    
    Patch by marcuse; reviewed by Sam Tunnicliffe for CASSANDRA-21222
---
 CHANGES.txt                                        |   1 +
 src/java/org/apache/cassandra/tcm/Startup.java     |   2 +
 .../tcm/sequences/SingleNodeSequences.java         |  40 +++++-
 .../tcm/sequences/UnbootstrapAndLeave.java         |  15 +--
 .../distributed/test/tcm/LostCommitReqResTest.java | 135 +++++++++++++++++++++
 5 files changed, 180 insertions(+), 13 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 86f663dfdf..fcb0f59a8d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 6.0-alpha2
+ * Handle lost response when committing PrepareMove (CASSANDRA-21222)
  * SEPExecutor.maybeExecuteImmediately does not always execute tasks 
immediately despite available worker capacity (CASSANDRA-21429)
  * Safely regain ranges and delete retired command stores (CASSANDRA-21212)
  * Reduce memory allocations in miscellaneous places along read path 
(CASSANDRA-21360)
diff --git a/src/java/org/apache/cassandra/tcm/Startup.java 
b/src/java/org/apache/cassandra/tcm/Startup.java
index 970ea190c6..2bfe4b4cc9 100644
--- a/src/java/org/apache/cassandra/tcm/Startup.java
+++ b/src/java/org/apache/cassandra/tcm/Startup.java
@@ -449,6 +449,8 @@ import static 
org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
                 if (isReplacing)
                     ReconfigureCMS.maybeReconfigureCMS(metadata, 
DatabaseDescriptor.getReplaceAddress());
 
+                // if this throws startup is aborted and operator needs to 
restart, in that case the IPS is resumed if
+                // it was successfully committed
                 
ClusterMetadataService.instance().commit(initialTransformation.get());
                 // When Accord starts up it needs to check for any historic 
epochs that it needs to know about (in order
                 // to handle pending transactions), in order to know what 
nodes to check with it needs to know what the
diff --git 
a/src/java/org/apache/cassandra/tcm/sequences/SingleNodeSequences.java 
b/src/java/org/apache/cassandra/tcm/sequences/SingleNodeSequences.java
index 3a2fd2221a..313700aaaf 100644
--- a/src/java/org/apache/cassandra/tcm/sequences/SingleNodeSequences.java
+++ b/src/java/org/apache/cassandra/tcm/sequences/SingleNodeSequences.java
@@ -85,7 +85,9 @@ public interface SingleNodeSequences
             ClusterMetadataService.instance().commit(new PrepareLeave(self,
                                                                       force,
                                                                       
ClusterMetadataService.instance().placementProvider(),
-                                                                      
LeaveStreams.Kind.UNBOOTSTRAP));
+                                                                      
LeaveStreams.Kind.UNBOOTSTRAP),
+                                                     m -> m,
+                                                     
failureHandler("PrepareLeave", 
StorageService.instance::markDecommissionFailed));
         }
         else if (InProgressSequences.isLeave(inProgress))
         {
@@ -182,13 +184,24 @@ public interface SingleNodeSequences
         ClusterMetadataService.instance().commit(new PrepareMove(self,
                                                                  
Collections.singleton(newToken),
                                                                  
ClusterMetadataService.instance().placementProvider(),
-                                                                 true));
+                                                                 true),
+                                                 m -> m,
+                                                 failureHandler("PrepareMove", 
StorageService.instance::markMoveFailed));
         InProgressSequences.finishInProgressSequences(self);
 
         if (logger.isDebugEnabled())
             logger.debug("Successfully moved to new token {}", 
StorageService.instance.getLocalTokens().iterator().next());
     }
 
+    private static 
ClusterMetadataService.CommitFailureHandler<ClusterMetadata> 
failureHandler(String type, Runnable markFailed)
+    {
+        return (code, msg) -> {
+            logger.warn("Got failure committing {} transformation: {} {}", 
type, code, msg);
+            markFailed.run();
+            throw new IllegalStateException(String.format("Can not commit 
transformation: \"%s\"(%s).", code, msg));
+        };
+    }
+
     static void resumeMove()
     {
         if (ClusterMetadataService.instance().isMigrating() || 
ClusterMetadataService.state() == ClusterMetadataService.State.GOSSIP)
@@ -201,6 +214,11 @@ public interface SingleNodeSequences
         {
             String msg = "No move operation in progress, can't resume";
             logger.info(msg);
+            if (StorageService.instance.operationMode() == MOVE_FAILED)
+            {
+                // there is no ongoing move to resume, but operation mode 
thinks there is
+                StorageService.instance.clearTransientMode();
+            }
             throw new IllegalStateException(msg);
         }
         if (StorageService.instance.operationMode() != MOVE_FAILED)
@@ -221,7 +239,7 @@ public interface SingleNodeSequences
     /**
      *
      * @param nodeId node id to abort the MSO for, null for local node
-     * @param kind the expected kind of the multi step operation to abolt
+     * @param kind the expected kind of the multi step operation to abort
      * @param ssMode the legacy mode we want storage service to be in, null 
for any
      */
     private static void abortHelper(@Nullable String nodeId, 
MultiStepOperation.Kind kind, @Nullable StorageService.Mode ssMode)
@@ -234,9 +252,19 @@ public interface SingleNodeSequences
         MultiStepOperation<?> sequence = 
metadata.inProgressSequences.get(toAbort);
         if (sequence == null || sequence.kind() != kind)
         {
-            String msg = String.format("No %s operation in progress for %s, 
can't abort (%s)", kind, toAbort, sequence);
-            logger.info(msg);
-            throw new IllegalStateException(msg);
+            if (toAbort.equals(metadata.myNodeId()) && ssMode != null && 
StorageService.instance.operationMode() == ssMode)
+            {
+                // there is no ongoing sequence with the given kind, but 
storage service operation mode is set, clear it
+                logger.debug("There is no ongoing {} sequence for this node, 
but operation mode is {} - clearing transient mode", kind, ssMode);
+                StorageService.instance.clearTransientMode();
+                return;
+            }
+            else
+            {
+                String msg = String.format("No %s operation in progress for 
%s, can't abort (%s)", kind, toAbort, sequence);
+                logger.info(msg);
+                throw new IllegalStateException(msg);
+            }
         }
         if (toAbort.equals(metadata.myNodeId()))
         {
diff --git 
a/src/java/org/apache/cassandra/tcm/sequences/UnbootstrapAndLeave.java 
b/src/java/org/apache/cassandra/tcm/sequences/UnbootstrapAndLeave.java
index da8ae74e6a..deca30ab92 100644
--- a/src/java/org/apache/cassandra/tcm/sequences/UnbootstrapAndLeave.java
+++ b/src/java/org/apache/cassandra/tcm/sequences/UnbootstrapAndLeave.java
@@ -92,12 +92,12 @@ public class UnbootstrapAndLeave extends 
MultiStepOperation<Epoch>
      */
     @VisibleForTesting
     UnbootstrapAndLeave(Epoch latestModification,
-                               LockedRanges.Key lockKey,
-                               Transformation.Kind next,
-                               PrepareLeave.StartLeave startLeave,
-                               PrepareLeave.MidLeave midLeave,
-                               PrepareLeave.FinishLeave finishLeave,
-                               LeaveStreams streams)
+                        LockedRanges.Key lockKey,
+                        Transformation.Kind next,
+                        PrepareLeave.StartLeave startLeave,
+                        PrepareLeave.MidLeave midLeave,
+                        PrepareLeave.FinishLeave finishLeave,
+                        LeaveStreams streams)
     {
         super(nextToIndex(next), latestModification);
         this.lockKey = lockKey;
@@ -198,7 +198,8 @@ public class UnbootstrapAndLeave extends 
MultiStepOperation<Epoch>
                 }
                 catch (ExecutionException e)
                 {
-                    StorageService.instance.markDecommissionFailed();
+                    if 
(startLeave.nodeId().equals(ClusterMetadata.current().myNodeId()))
+                        StorageService.instance.markDecommissionFailed();
                     JVMStabilityInspector.inspectThrowable(e);
                     logger.error("Error while decommissioning node: {}", 
e.getCause().getMessage());
                     throw new RuntimeException("Error while decommissioning 
node: " + e.getCause().getMessage());
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/tcm/LostCommitReqResTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/tcm/LostCommitReqResTest.java
new file mode 100644
index 0000000000..dfa3512d09
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/tcm/LostCommitReqResTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.tcm;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.service.StorageService;
+
+import static org.junit.Assert.assertEquals;
+
+public class LostCommitReqResTest extends TestBaseImpl
+{
+    @Test
+    public void lostMoveCommitResponseTest() throws IOException
+    {
+        try (Cluster cluster = init(builder().withNodes(2)
+                                             .withConfig(c -> 
c.with(Feature.NETWORK, Feature.GOSSIP).set("cms_await_timeout", 
"1s").set("cms_default_max_retries", "5"))
+                                             .start()))
+        {
+            // no commit responses
+            
cluster.filters().verbs(Verb.TCM_COMMIT_RSP.id).from(1).to(2).drop();
+            // lost response when committing PrepareMove, fails the nodetool 
command and halts progress
+            cluster.get(2).nodetoolResult("move", "1234").asserts().failure();
+            assertMoveFailed(cluster.get(2)); // we should be in MOVE_FAILED 
state to allow abortmove
+            // still no responses, committing CancelInProgressSequence 
response is lost, but is actually committed
+            cluster.get(2).nodetoolResult("abortmove").asserts().failure();
+            assertNormal(cluster.get(2)); // and we should be back to normal
+            cluster.get(2).nodetoolResult("move", "1234").asserts().failure();
+            assertMoveFailed(cluster.get(2));
+            // finishing the MSO does not depend on any commit responses, just 
that ClusterMetadata.current() is up to date, so this is successful;
+            cluster.get(2).nodetoolResult("move", 
"--resume").asserts().success();
+            assertNormal(cluster.get(2));
+        }
+    }
+
+    @Test
+    public void lostMoveCommitRequestTest() throws IOException
+    {
+        try (Cluster cluster = init(builder().withNodes(2)
+                                             .withConfig(c -> 
c.with(Feature.NETWORK, Feature.GOSSIP).set("cms_await_timeout", 
"1s").set("cms_default_max_retries", "5"))
+                                             .start()))
+        {
+            // no commit requests
+            
cluster.filters().verbs(Verb.TCM_COMMIT_REQ.id).from(2).to(1).drop();
+            cluster.get(2).nodetoolResult("move", "1234").asserts().failure();
+            // state should be "move failed" since we don't know if the 
request or response went missing:
+            assertMoveFailed(cluster.get(2));
+            cluster.filters().reset();
+            // abort move should be successful, it only clears the transient 
state in this case though
+            cluster.get(2).nodetoolResult("abortmove").asserts().success();
+            assertNormal(cluster.get(2));
+            cluster.get(2).nodetoolResult("move", "1234").asserts().success();
+            assertNormal(cluster.get(2));
+        }
+    }
+
+    @Test
+    public void lostDecomCommitResponseTest() throws IOException
+    {
+        try (Cluster cluster = init(builder().withNodes(2)
+                                             .withConfig(c -> 
c.with(Feature.NETWORK, Feature.GOSSIP).set("cms_await_timeout", 
"1s").set("cms_default_max_retries", "5"))
+                                             .start()))
+        {
+            
cluster.filters().verbs(Verb.TCM_COMMIT_RSP.id).from(1).to(2).drop();
+            cluster.get(2).nodetoolResult("decommission", 
"--force").asserts().failure();
+            assertDecomFailed(cluster.get(2));
+            
cluster.get(2).nodetoolResult("abortdecommission").asserts().failure();
+            assertNormal(cluster.get(2));
+            cluster.get(2).nodetoolResult("decommission", 
"--force").asserts().failure();
+            assertDecomFailed(cluster.get(2));
+            cluster.get(2).nodetoolResult("decommission").asserts().success();
+        }
+    }
+
+    @Test
+    public void lostDecomCommitRequestTest() throws IOException
+    {
+        try (Cluster cluster = init(builder().withNodes(2)
+                                             .withConfig(c -> 
c.with(Feature.NETWORK, Feature.GOSSIP).set("cms_await_timeout", 
"1s").set("cms_default_max_retries", "5"))
+                                             .start()))
+        {
+            
cluster.filters().verbs(Verb.TCM_COMMIT_REQ.id).from(2).to(1).drop();
+            cluster.get(2).nodetoolResult("decommission", 
"--force").asserts().failure();
+            assertDecomFailed(cluster.get(2));
+            cluster.filters().reset();
+            
cluster.get(2).nodetoolResult("abortdecommission").asserts().success();
+            assertNormal(cluster.get(2));
+            cluster.get(2).nodetoolResult("decommission", 
"--force").asserts().success();
+        }
+    }
+
+    private static void assertNormal(IInvokableInstance i)
+    {
+        assertOperationMode(i, StorageService.Mode.NORMAL);
+    }
+
+    private static void assertMoveFailed(IInvokableInstance i)
+    {
+        assertOperationMode(i, StorageService.Mode.MOVE_FAILED);
+    }
+
+    private static void assertDecomFailed(IInvokableInstance i)
+    {
+        assertOperationMode(i, StorageService.Mode.DECOMMISSION_FAILED);
+    }
+
+    private static void assertOperationMode(IInvokableInstance i, 
StorageService.Mode expectedMode)
+    {
+        String mode = i.callOnInstance(() -> 
StorageService.instance.operationMode().toString());
+        assertEquals(expectedMode.toString(), mode);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to