This is an automated email from the ASF dual-hosted git repository.
samt pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new a27e09930d Nodetool cms reconfigure reports streaming failures
correctly
a27e09930d is described below
commit a27e09930dea92574edd48a0c7a6098af4e7c081
Author: Sam Tunnicliffe <[email protected]>
AuthorDate: Tue Oct 1 15:19:46 2024 +0100
Nodetool cms reconfigure reports streaming failures correctly
Patch by Sam Tunnicliffe; reviewed by Marcus Eriksson for
CASSANDRA-19972
---
CHANGES.txt | 1 +
.../cassandra/tcm/sequences/ReconfigureCMS.java | 6 +-
.../log/ReconfigureCMSStreamingFailureTest.java | 100 +++++++++++++++++++++
3 files changed, 105 insertions(+), 2 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index d5ec59c1ec..94c5a9835f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
5.1
+ * Nodetool reconfigure cms has correct return code when streaming fails
(CASSANDRA-19972)
* Reintroduce RestrictionSet#iterator() optimization around multi-column
restrictions (CASSANDRA-20034)
* Explicitly localize strings to Locale.US for internal implementation
(CASSANDRA-19953)
* Add -H option for human-friendly output in nodetool compactionhistory
(CASSANDRA-20015)
diff --git a/src/java/org/apache/cassandra/tcm/sequences/ReconfigureCMS.java
b/src/java/org/apache/cassandra/tcm/sequences/ReconfigureCMS.java
index fb6d9998c4..57b5e68e80 100644
--- a/src/java/org/apache/cassandra/tcm/sequences/ReconfigureCMS.java
+++ b/src/java/org/apache/cassandra/tcm/sequences/ReconfigureCMS.java
@@ -182,8 +182,10 @@ public class ReconfigureCMS extends
MultiStepOperation<AdvanceCMSReconfiguration
}
catch (Throwable t)
{
- logger.error("Could not finish adding the node to the Cluster
Metadata Service", t);
- return SequenceState.blocked();
+ String message = "Some data streaming failed. Use nodetool to
check CMS reconfiguration state and resume. " +
+ "For more, see `nodetool help cms reconfigure`.";
+ logger.warn(message);
+ return SequenceState.error(new RuntimeException(message));
}
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/log/ReconfigureCMSStreamingFailureTest.java
b/test/distributed/org/apache/cassandra/distributed/test/log/ReconfigureCMSStreamingFailureTest.java
new file mode 100644
index 0000000000..e0364736e6
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/log/ReconfigureCMSStreamingFailureTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.log;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.Test;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.sequences.ReconfigureCMS;
+import org.apache.cassandra.tcm.transformations.cms.PrepareCMSReconfiguration;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+
+public class ReconfigureCMSStreamingFailureTest extends TestBaseImpl
+{
+ @Test
+ public void testNodetoolFailureWhenStreamingErrorOccurs() throws
IOException, ExecutionException, InterruptedException
+ {
+ try (Cluster cluster = init(builder().withNodes(3)
+ .withConfig(c ->
c.with(Feature.NETWORK))
+
.withInstanceInitializer(BB::install)
+ .start()))
+ {
+ cluster.forEach(i -> i.runOnInstance(() ->
BB.failStreaming.set(true)));
+ cluster.get(1)
+ .nodetoolResult("cms", "reconfigure", "3")
+ .asserts()
+ .failure()
+ .stderrContains("Some data streaming failed. Use nodetool
to check CMS reconfiguration state and resume.");
+ String status = cluster.get(1).nodetoolResult("cms",
"reconfigure", "--status").getStdout();
+ assertTrue(status.contains("ACTIVE: [/127.0.0"));
+ assertTrue(status.contains("ADDITIONS: [/127.0.0"));
+ cluster.forEach(i -> assertTrue(i.callOnInstance(() ->
PrepareCMSReconfiguration.needsReconfiguration(ClusterMetadata.current()))));
+
+ cluster.forEach(i -> i.runOnInstance(() ->
BB.failStreaming.set(false)));
+ cluster.get(1).nodetoolResult("cms", "reconfigure",
"--resume").asserts().success();
+ cluster.forEach(i -> assertFalse(i.callOnInstance(() ->
PrepareCMSReconfiguration.needsReconfiguration(ClusterMetadata.current()))));
+ assertTrue(cluster.get(1).callOnInstance(() -> {
+ ClusterMetadata metadata = ClusterMetadata.current();
+ return
metadata.fullCMSMemberIds().containsAll(metadata.directory.peerIds());
+ }));
+ }
+ }
+
+ public static class BB
+ {
+ public static AtomicBoolean failStreaming = new AtomicBoolean(false);
+ public static void install(ClassLoader cl, int i)
+ {
+ new ByteBuddy().rebase(ReconfigureCMS.class)
+ .method(named("streamRanges"))
+ .intercept(MethodDelegation.to(BB.class))
+ .make()
+ .load(cl, ClassLoadingStrategy.Default.INJECTION);
+ }
+
+ public static void streamRanges(Replica replicaForStreaming,
+ Set<InetAddressAndPort>
streamCandidates,
+ @SuperCall Callable<Void> zuper)
throws Exception
+ {
+ if (failStreaming.get())
+ throw new IOException("failed to connect to " +
replicaForStreaming.endpoint() + " for streaming data");
+
+ zuper.call();
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]