This is an automated email from the ASF dual-hosted git repository. asf-gitbox-commits pushed a commit to branch cassandra-6.0 in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit af5eed2a3277b747a1a4a2b7bb18dce39fb167d5 Author: Marcus Eriksson <[email protected]> AuthorDate: Mon Mar 16 15:04:41 2026 +0100 Handle lost response when committing PrepareMove Patch by marcuse; reviewed by Sam Tunnicliffe for CASSANDRA-21222 --- CHANGES.txt | 1 + src/java/org/apache/cassandra/tcm/Startup.java | 2 + .../tcm/sequences/SingleNodeSequences.java | 40 +++++- .../tcm/sequences/UnbootstrapAndLeave.java | 15 +-- .../distributed/test/tcm/LostCommitReqResTest.java | 135 +++++++++++++++++++++ 5 files changed, 180 insertions(+), 13 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 86f663dfdf..fcb0f59a8d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 6.0-alpha2 + * Handle lost response when committing PrepareMove (CASSANDRA-21222) * SEPExecutor.maybeExecuteImmediately does not always execute tasks immediately despite available worker capacity (CASSANDRA-21429) * Safely regain ranges and delete retired command stores (CASSANDRA-21212) * Reduce memory allocations in miscellaneous places along read path (CASSANDRA-21360) diff --git a/src/java/org/apache/cassandra/tcm/Startup.java b/src/java/org/apache/cassandra/tcm/Startup.java index 970ea190c6..2bfe4b4cc9 100644 --- a/src/java/org/apache/cassandra/tcm/Startup.java +++ b/src/java/org/apache/cassandra/tcm/Startup.java @@ -449,6 +449,8 @@ import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort; if (isReplacing) ReconfigureCMS.maybeReconfigureCMS(metadata, DatabaseDescriptor.getReplaceAddress()); + // if this throws startup is aborted and operator needs to restart, in that case the IPS is resumed if + // it was successfully committed ClusterMetadataService.instance().commit(initialTransformation.get()); // When Accord starts up it needs to check for any historic epochs that it needs to know about (in order // to handle pending transactions), in order to know what nodes to check with it needs to know what the diff --git a/src/java/org/apache/cassandra/tcm/sequences/SingleNodeSequences.java b/src/java/org/apache/cassandra/tcm/sequences/SingleNodeSequences.java index 3a2fd2221a..313700aaaf 100644 --- a/src/java/org/apache/cassandra/tcm/sequences/SingleNodeSequences.java +++ b/src/java/org/apache/cassandra/tcm/sequences/SingleNodeSequences.java @@ -85,7 +85,9 @@ public interface SingleNodeSequences ClusterMetadataService.instance().commit(new PrepareLeave(self, force, ClusterMetadataService.instance().placementProvider(), - LeaveStreams.Kind.UNBOOTSTRAP)); + LeaveStreams.Kind.UNBOOTSTRAP), + m -> m, + failureHandler("PrepareLeave", StorageService.instance::markDecommissionFailed)); } else if (InProgressSequences.isLeave(inProgress)) { @@ -182,13 +184,24 @@ public interface SingleNodeSequences ClusterMetadataService.instance().commit(new PrepareMove(self, Collections.singleton(newToken), ClusterMetadataService.instance().placementProvider(), - true)); + true), + m -> m, + failureHandler("PrepareMove", StorageService.instance::markMoveFailed)); InProgressSequences.finishInProgressSequences(self); if (logger.isDebugEnabled()) logger.debug("Successfully moved to new token {}", StorageService.instance.getLocalTokens().iterator().next()); } + private static ClusterMetadataService.CommitFailureHandler<ClusterMetadata> failureHandler(String type, Runnable markFailed) + { + return (code, msg) -> { + logger.warn("Got failure committing {} transformation: {} {}", type, code, msg); + markFailed.run(); + throw new IllegalStateException(String.format("Can not commit transformation: \"%s\"(%s).", code, msg)); + }; + } + static void resumeMove() { if (ClusterMetadataService.instance().isMigrating() || ClusterMetadataService.state() == ClusterMetadataService.State.GOSSIP) @@ -201,6 +214,11 @@ public interface SingleNodeSequences { String msg = "No move operation in progress, can't resume"; logger.info(msg); + if (StorageService.instance.operationMode() == MOVE_FAILED) + { + // there is no ongoing move to resume, but operation mode thinks there is + StorageService.instance.clearTransientMode(); + } throw new IllegalStateException(msg); } if (StorageService.instance.operationMode() != MOVE_FAILED) @@ -221,7 +239,7 @@ public interface SingleNodeSequences /** * * @param nodeId node id to abort the MSO for, null for local node - * @param kind the expected kind of the multi step operation to abolt + * @param kind the expected kind of the multi step operation to abort * @param ssMode the legacy mode we want storage service to be in, null for any */ private static void abortHelper(@Nullable String nodeId, MultiStepOperation.Kind kind, @Nullable StorageService.Mode ssMode) @@ -234,9 +252,19 @@ public interface SingleNodeSequences MultiStepOperation<?> sequence = metadata.inProgressSequences.get(toAbort); if (sequence == null || sequence.kind() != kind) { - String msg = String.format("No %s operation in progress for %s, can't abort (%s)", kind, toAbort, sequence); - logger.info(msg); - throw new IllegalStateException(msg); + if (toAbort.equals(metadata.myNodeId()) && ssMode != null && StorageService.instance.operationMode() == ssMode) + { + // there is no ongoing sequence with the given kind, but storage service operation mode is set, clear it + logger.debug("There is no ongoing {} sequence for this node, but operation mode is {} - clearing transient mode", kind, ssMode); + StorageService.instance.clearTransientMode(); + return; + } + else + { + String msg = String.format("No %s operation in progress for %s, can't abort (%s)", kind, toAbort, sequence); + logger.info(msg); + throw new IllegalStateException(msg); + } } if (toAbort.equals(metadata.myNodeId())) { diff --git a/src/java/org/apache/cassandra/tcm/sequences/UnbootstrapAndLeave.java b/src/java/org/apache/cassandra/tcm/sequences/UnbootstrapAndLeave.java index da8ae74e6a..deca30ab92 100644 --- a/src/java/org/apache/cassandra/tcm/sequences/UnbootstrapAndLeave.java +++ b/src/java/org/apache/cassandra/tcm/sequences/UnbootstrapAndLeave.java @@ -92,12 +92,12 @@ public class UnbootstrapAndLeave extends MultiStepOperation<Epoch> */ @VisibleForTesting UnbootstrapAndLeave(Epoch latestModification, - LockedRanges.Key lockKey, - Transformation.Kind next, - PrepareLeave.StartLeave startLeave, - PrepareLeave.MidLeave midLeave, - PrepareLeave.FinishLeave finishLeave, - LeaveStreams streams) + LockedRanges.Key lockKey, + Transformation.Kind next, + PrepareLeave.StartLeave startLeave, + PrepareLeave.MidLeave midLeave, + PrepareLeave.FinishLeave finishLeave, + LeaveStreams streams) { super(nextToIndex(next), latestModification); this.lockKey = lockKey; @@ -198,7 +198,8 @@ public class UnbootstrapAndLeave extends MultiStepOperation<Epoch> } catch (ExecutionException e) { - StorageService.instance.markDecommissionFailed(); + if (startLeave.nodeId().equals(ClusterMetadata.current().myNodeId())) + StorageService.instance.markDecommissionFailed(); JVMStabilityInspector.inspectThrowable(e); logger.error("Error while decommissioning node: {}", e.getCause().getMessage()); throw new RuntimeException("Error while decommissioning node: " + e.getCause().getMessage()); diff --git a/test/distributed/org/apache/cassandra/distributed/test/tcm/LostCommitReqResTest.java b/test/distributed/org/apache/cassandra/distributed/test/tcm/LostCommitReqResTest.java new file mode 100644 index 0000000000..dfa3512d09 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/tcm/LostCommitReqResTest.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.tcm; + +import java.io.IOException; + +import org.junit.Test; + +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.net.Verb; +import org.apache.cassandra.service.StorageService; + +import static org.junit.Assert.assertEquals; + +public class LostCommitReqResTest extends TestBaseImpl +{ + @Test + public void lostMoveCommitResponseTest() throws IOException + { + try (Cluster cluster = init(builder().withNodes(2) + .withConfig(c -> c.with(Feature.NETWORK, Feature.GOSSIP).set("cms_await_timeout", "1s").set("cms_default_max_retries", "5")) + .start())) + { + // no commit responses + cluster.filters().verbs(Verb.TCM_COMMIT_RSP.id).from(1).to(2).drop(); + // lost response when committing PrepareMove, fails the nodetool command and halts progress + cluster.get(2).nodetoolResult("move", "1234").asserts().failure(); + assertMoveFailed(cluster.get(2)); // we should be in MOVE_FAILED state to allow abortmove + // still no responses, committing CancelInProgressSequence response is lost, but is actually committed + cluster.get(2).nodetoolResult("abortmove").asserts().failure(); + assertNormal(cluster.get(2)); // and we should be back to normal + cluster.get(2).nodetoolResult("move", "1234").asserts().failure(); + assertMoveFailed(cluster.get(2)); + // finishing the MSO does not depend on any commit responses, just that ClusterMetadata.current() is up to date, so this is successful; + cluster.get(2).nodetoolResult("move", "--resume").asserts().success(); + assertNormal(cluster.get(2)); + } + } + + @Test + public void lostMoveCommitRequestTest() throws IOException + { + try (Cluster cluster = init(builder().withNodes(2) + .withConfig(c -> c.with(Feature.NETWORK, Feature.GOSSIP).set("cms_await_timeout", "1s").set("cms_default_max_retries", "5")) + .start())) + { + // no commit requests + cluster.filters().verbs(Verb.TCM_COMMIT_REQ.id).from(2).to(1).drop(); + cluster.get(2).nodetoolResult("move", "1234").asserts().failure(); + // state should be "move failed" since we don't know if the request or response went missing: + assertMoveFailed(cluster.get(2)); + cluster.filters().reset(); + // abort move should be successful, it only clears the transient state in this case though + cluster.get(2).nodetoolResult("abortmove").asserts().success(); + assertNormal(cluster.get(2)); + cluster.get(2).nodetoolResult("move", "1234").asserts().success(); + assertNormal(cluster.get(2)); + } + } + + @Test + public void lostDecomCommitResponseTest() throws IOException + { + try (Cluster cluster = init(builder().withNodes(2) + .withConfig(c -> c.with(Feature.NETWORK, Feature.GOSSIP).set("cms_await_timeout", "1s").set("cms_default_max_retries", "5")) + .start())) + { + cluster.filters().verbs(Verb.TCM_COMMIT_RSP.id).from(1).to(2).drop(); + cluster.get(2).nodetoolResult("decommission", "--force").asserts().failure(); + assertDecomFailed(cluster.get(2)); + cluster.get(2).nodetoolResult("abortdecommission").asserts().failure(); + assertNormal(cluster.get(2)); + cluster.get(2).nodetoolResult("decommission", "--force").asserts().failure(); + assertDecomFailed(cluster.get(2)); + cluster.get(2).nodetoolResult("decommission").asserts().success(); + } + } + + @Test + public void lostDecomCommitRequestTest() throws IOException + { + try (Cluster cluster = init(builder().withNodes(2) + .withConfig(c -> c.with(Feature.NETWORK, Feature.GOSSIP).set("cms_await_timeout", "1s").set("cms_default_max_retries", "5")) + .start())) + { + cluster.filters().verbs(Verb.TCM_COMMIT_REQ.id).from(2).to(1).drop(); + cluster.get(2).nodetoolResult("decommission", "--force").asserts().failure(); + assertDecomFailed(cluster.get(2)); + cluster.filters().reset(); + cluster.get(2).nodetoolResult("abortdecommission").asserts().success(); + assertNormal(cluster.get(2)); + cluster.get(2).nodetoolResult("decommission", "--force").asserts().success(); + } + } + + private static void assertNormal(IInvokableInstance i) + { + assertOperationMode(i, StorageService.Mode.NORMAL); + } + + private static void assertMoveFailed(IInvokableInstance i) + { + assertOperationMode(i, StorageService.Mode.MOVE_FAILED); + } + + private static void assertDecomFailed(IInvokableInstance i) + { + assertOperationMode(i, StorageService.Mode.DECOMMISSION_FAILED); + } + + private static void assertOperationMode(IInvokableInstance i, StorageService.Mode expectedMode) + { + String mode = i.callOnInstance(() -> StorageService.instance.operationMode().toString()); + assertEquals(expectedMode.toString(), mode); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
