This is an automated email from the ASF dual-hosted git repository.
smiklosovic pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new e2a6c99310 Expose bootstrap and decommission state to nodetool info
e2a6c99310 is described below
commit e2a6c99310aa93ba3506ca8f603ae1039372f533
Author: Stefan Miklosovic <[email protected]>
AuthorDate: Wed Jun 14 17:43:50 2023 +0200
Expose bootstrap and decommission state to nodetool info
patch by Stefan Miklosovic; reviewed by Brandon Williams CASSANDRA-18555
Co-authored-by: Jaydeepkumar Chovatia <[email protected]>
---
CHANGES.txt | 1 +
NEWS.txt | 1 +
.../apache/cassandra/service/StorageService.java | 121 ++++++++----
.../cassandra/service/StorageServiceMBean.java | 19 ++
.../cassandra/tools/nodetool/Decommission.java | 10 +
.../org/apache/cassandra/tools/nodetool/Info.java | 4 +
.../distributed/test/DecommissionTest.java | 220 +++++++++++++++++++++
7 files changed, 339 insertions(+), 37 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 6437d2796d..334f829bfc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
5.0
+ * Expose bootstrap and decommission state to nodetool info (CASSANDRA-18555)
* Fix SSTabledump errors when dumping data from index (CASSANDRA-17698)
* Avoid unnecessary deserialization of terminal arguments when executing CQL
functions (CASSANDRA-18566)
* Remove dependency on pytz library for setting CQLSH timezones on Python
version >= 3.9 (CASSANDRA-17433)
diff --git a/NEWS.txt b/NEWS.txt
index 43e980582e..7027b6e489 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -164,6 +164,7 @@ New features
- Added `sstablepartitions` offline tool to find large partitions in
sstables.
- `cassandra-stress` has a new option called '-jmx' which enables a user
to pass username and password to JMX (CASSANDRA-18544)
- It is possible to read all credentials for `cassandra-stress` from a
file via option `-credentials-file` (CASSANDRA-18544)
+ - nodetool info displays bootstrap state a node is in as well as if it was
decommissioned or if it failed to decommission (CASSANDRA-18555)
Upgrading
---------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java
b/src/java/org/apache/cassandra/service/StorageService.java
index 5f99fef4bc..1773673f61 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -263,6 +263,8 @@ import static org.apache.cassandra.net.NoPayload.noPayload;
import static org.apache.cassandra.net.Verb.REPLICATION_DONE_REQ;
import static
org.apache.cassandra.service.ActiveRepairService.ParentRepairStatus;
import static
org.apache.cassandra.service.ActiveRepairService.repairCommandExecutor;
+import static org.apache.cassandra.service.StorageService.Mode.DECOMMISSIONED;
+import static
org.apache.cassandra.service.StorageService.Mode.DECOMMISSION_FAILED;
import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
import static org.apache.cassandra.utils.Clock.Global.nanoTime;
import static
org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
@@ -414,7 +416,7 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
/* the probability for tracing any particular request, 0 disables tracing
and 1 enables for all */
private double traceProbability = 0.0;
- public enum Mode { STARTING, NORMAL, JOINING, LEAVING, DECOMMISSIONED,
MOVING, DRAINING, DRAINED }
+ public enum Mode { STARTING, NORMAL, JOINING, LEAVING, DECOMMISSIONED,
DECOMMISSION_FAILED, MOVING, DRAINING, DRAINED }
private volatile Mode operationMode = Mode.STARTING;
/* Used for tracking drain progress */
@@ -626,7 +628,7 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
* they get the Gossip shutdown message, so even if
* we don't get time to broadcast this, it is not a problem.
*
- * See {@link Gossiper#markAsShutdown(InetAddressAndPort)}
+ * See Gossiper.markAsShutdown(InetAddressAndPort)
*/
private void shutdownClientServers()
{
@@ -2157,7 +2159,8 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
/**
* All MVs have been created during bootstrap, so mark them as built
*/
- private void markViewsAsBuilt() {
+ private void markViewsAsBuilt()
+ {
for (String keyspace : Schema.instance.getUserKeyspaces().names())
{
for (ViewMetadata view:
Schema.instance.getKeyspaceMetadata(keyspace).views)
@@ -2168,11 +2171,18 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
/**
* Called when bootstrap did finish successfully
*/
- private void bootstrapFinished() {
+ private void bootstrapFinished()
+ {
markViewsAsBuilt();
isBootstrapMode = false;
}
+ @Override
+ public String getBootstrapState()
+ {
+ return SystemKeyspace.getBootstrapState().name();
+ }
+
public boolean resumeBootstrap()
{
if (isBootstrapMode && SystemKeyspace.bootstrapInProgress())
@@ -5128,18 +5138,32 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
public void decommission(boolean force) throws InterruptedException
{
+ if (operationMode == DECOMMISSIONED)
+ {
+ logger.info("This node was already decommissioned. There is no
point in decommissioning it again.");
+ return;
+ }
+
+ if (isDecommissioning())
+ {
+ logger.info("This node is still decommissioning.");
+ return;
+ }
+
TokenMetadata metadata = tokenMetadata.cloneAfterAllLeft();
+ // there is no point to do this logic again once node was
decommissioning but failed to do so
if (operationMode != Mode.LEAVING)
{
if
(!tokenMetadata.isMember(FBUtilities.getBroadcastAddressAndPort()))
throw new UnsupportedOperationException("local node is not a
member of the token ring yet");
- if (metadata.getAllEndpoints().size() < 2)
+ if (metadata.getAllEndpoints().size() < 2 &&
metadata.getAllEndpoints().contains(FBUtilities.getBroadcastAddressAndPort()))
throw new UnsupportedOperationException("no other normal
nodes in the ring; decommission would be pointless");
- if (operationMode != Mode.NORMAL)
+ if (operationMode != Mode.NORMAL && operationMode !=
DECOMMISSION_FAILED)
throw new UnsupportedOperationException("Node in " +
operationMode + " state; wait for status to become normal or restart");
}
+
if (!isDecommissioning.compareAndSet(false, true))
- throw new IllegalStateException("Node is still decommissioning.
Check nodetool netstats.");
+ throw new IllegalStateException("Node is still decommissioning.
Check nodetool netstats or nodetool info.");
if (logger.isDebugEnabled())
logger.debug("DECOMMISSIONING");
@@ -5150,27 +5174,35 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
String dc =
DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter();
- if (operationMode != Mode.LEAVING) // If we're already
decommissioning there is no point checking RF/pending ranges
+ // If we're already decommissioning there is no point checking
RF/pending ranges
+ if (operationMode != Mode.LEAVING)
{
int rf, numNodes;
for (String keyspaceName :
Schema.instance.getNonLocalStrategyKeyspaces().names())
{
if (!force)
{
+ boolean notEnoughLiveNodes = false;
Keyspace keyspace = Keyspace.open(keyspaceName);
if (keyspace.getReplicationStrategy() instanceof
NetworkTopologyStrategy)
{
NetworkTopologyStrategy strategy =
(NetworkTopologyStrategy) keyspace.getReplicationStrategy();
rf = strategy.getReplicationFactor(dc).allReplicas;
- numNodes =
metadata.getTopology().getDatacenterEndpoints().get(dc).size();
+ Collection<InetAddressAndPort> datacenterEndpoints
= metadata.getTopology().getDatacenterEndpoints().get(dc);
+ numNodes = datacenterEndpoints.size();
+ if (numNodes <= rf &&
datacenterEndpoints.contains(FBUtilities.getBroadcastAddressAndPort()))
+ notEnoughLiveNodes = true;
}
else
{
- numNodes = metadata.getAllEndpoints().size();
+ Set<InetAddressAndPort> allEndpoints =
metadata.getAllEndpoints();
+ numNodes = allEndpoints.size();
rf =
keyspace.getReplicationStrategy().getReplicationFactor().allReplicas;
+ if (numNodes <= rf &&
allEndpoints.contains(FBUtilities.getBroadcastAddressAndPort()))
+ notEnoughLiveNodes = true;
}
- if (numNodes <= rf)
+ if (notEnoughLiveNodes)
throw new UnsupportedOperationException("Not
enough live nodes to maintain replication factor in keyspace "
+
keyspaceName + " (RF = " + rf + ", N = " + numNodes + ")."
+ "
Perform a forceful decommission to ignore.");
@@ -5182,42 +5214,48 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
}
startLeaving();
- long timeout = Math.max(RING_DELAY_MILLIS,
BatchlogManager.instance.getBatchlogTimeout());
+ long timeout = Math.max(RING_DELAY_MILLIS,
BatchlogManager.getBatchlogTimeout());
setMode(Mode.LEAVING, "sleeping " + timeout + " ms for batch
processing and pending range setup", true);
Thread.sleep(timeout);
- Runnable finishLeaving = new Runnable()
+ unbootstrap();
+
+ // shutdown cql, gossip, messaging, Stage and set state to
DECOMMISSIONED
+
+ shutdownClientServers();
+ Gossiper.instance.stop();
+ try
{
- public void run()
- {
- shutdownClientServers();
- Gossiper.instance.stop();
- try
- {
- MessagingService.instance().shutdown();
- }
- catch (IOError ioe)
- {
- logger.info("failed to shutdown message service: {}",
ioe);
- }
+ MessagingService.instance().shutdown();
+ }
+ catch (IOError ioe)
+ {
+ logger.info("failed to shutdown message service", ioe);
+ }
- Stage.shutdownNow();
-
SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.DECOMMISSIONED);
- setMode(Mode.DECOMMISSIONED, true);
- // let op be responsible for killing the process
- }
- };
- unbootstrap(finishLeaving);
+ Stage.shutdownNow();
+
SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.DECOMMISSIONED);
+ setMode(DECOMMISSIONED, true);
+ // let op be responsible for killing the process
}
catch (InterruptedException e)
{
- throw new UncheckedInterruptedException(e);
+ setMode(DECOMMISSION_FAILED, true);
+ logger.error("Node interrupted while decommissioning");
+ throw new RuntimeException("Node interrupted while
decommissioning");
}
catch (ExecutionException e)
{
- logger.error("Error while decommissioning node ", e.getCause());
+ setMode(DECOMMISSION_FAILED, true);
+ logger.error("Error while decommissioning node: {}",
e.getCause().getMessage());
throw new RuntimeException("Error while decommissioning node: " +
e.getCause().getMessage());
}
+ catch (Throwable t)
+ {
+ setMode(DECOMMISSION_FAILED, true);
+ logger.error("Error while decommissioning node: {}",
t.getMessage());
+ throw t;
+ }
finally
{
isDecommissioning.set(false);
@@ -5254,7 +5292,7 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
return () -> streamRanges(rangesToStream);
}
- private void unbootstrap(Runnable onFinish) throws ExecutionException,
InterruptedException
+ private void unbootstrap() throws ExecutionException, InterruptedException
{
Supplier<Future<StreamState>> startStreaming =
prepareUnbootstrapStreaming();
@@ -5290,7 +5328,6 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
hintsSuccess.get();
logger.debug("stream acks all received.");
leaveRing();
- onFinish.run();
}
private Future streamHints()
@@ -5610,7 +5647,17 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
public boolean isDecommissioned()
{
- return operationMode == Mode.DECOMMISSIONED;
+ return operationMode == DECOMMISSIONED;
+ }
+
+ public boolean isDecommissionFailed()
+ {
+ return operationMode == DECOMMISSION_FAILED;
+ }
+
+ public boolean isDecommissioning()
+ {
+ return isDecommissioning.get();
}
public String getDrainProgress()
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 154b0d86d3..8c3428e703 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -489,6 +489,23 @@ public interface StorageServiceMBean extends
NotificationEmitter
*/
public void decommission(boolean force) throws InterruptedException;
+ /**
+ * Returns whether a node has failed to decommission.
+ *
+ * The fact that this method returns false does not mean that there was an
attempt to
+ * decommission this node which was successful.
+ *
+ * @return true if decommission of this node has failed, false otherwise
+ */
+ public boolean isDecommissionFailed();
+
+ /**
+ * Returns whether a node is being decommissioned or not.
+ *
+ * @return true if this node is decommissioning, false otherwise
+ */
+ public boolean isDecommissioning();
+
/**
* @param newToken token to move this node to.
* This node will unload its data onto its neighbors, and bootstrap to the
new token.
@@ -1006,6 +1023,8 @@ public interface StorageServiceMBean extends
NotificationEmitter
*/
public boolean resumeBootstrap();
+ public String getBootstrapState();
+
/** Gets the concurrency settings for processing stages*/
static class StageConcurrency implements Serializable
{
diff --git a/src/java/org/apache/cassandra/tools/nodetool/Decommission.java
b/src/java/org/apache/cassandra/tools/nodetool/Decommission.java
index 98b6d5846c..de70932c53 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/Decommission.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/Decommission.java
@@ -37,6 +37,16 @@ public class Decommission extends NodeToolCmd
{
try
{
+ if (probe.getStorageService().isDecommissioning())
+ {
+ probe.output().out.println("This node is still
decommissioning.");
+ return;
+ }
+ if
("DECOMMISSIONED".equals(probe.getStorageService().getBootstrapState()))
+ {
+ probe.output().out.println("Node was already decommissioned.");
+ return;
+ }
probe.decommission(force);
} catch (InterruptedException e)
{
diff --git a/src/java/org/apache/cassandra/tools/nodetool/Info.java
b/src/java/org/apache/cassandra/tools/nodetool/Info.java
index 5e0d87c767..72086fa568 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/Info.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/Info.java
@@ -177,6 +177,10 @@ public class Info extends NodeToolCmd
{
out.printf("%-23s: (node is not joined to the cluster)%n",
"Token");
}
+
+ out.printf("%-23s: %s%n", "Bootstrap state",
probe.getStorageService().getBootstrapState());
+ out.printf("%-23s: %s%n", "Decommissioning",
probe.getStorageService().isDecommissioning());
+ out.printf("%-23s: %s%n", "Decommission failed",
probe.getStorageService().isDecommissionFailed());
}
/**
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/DecommissionTest.java
b/test/distributed/org/apache/cassandra/distributed/test/DecommissionTest.java
new file mode 100644
index 0000000000..66091da3ef
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/DecommissionTest.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.util.concurrent.Callable;
+import java.util.function.Supplier;
+
+import org.junit.Test;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.streaming.StreamState;
+import org.apache.cassandra.utils.concurrent.Future;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.apache.cassandra.db.SystemKeyspace.BootstrapState.COMPLETED;
+import static
org.apache.cassandra.db.SystemKeyspace.BootstrapState.DECOMMISSIONED;
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static
org.apache.cassandra.distributed.shared.ClusterUtils.stopUnchecked;
+import static
org.apache.cassandra.service.StorageService.Mode.DECOMMISSION_FAILED;
+import static org.apache.cassandra.service.StorageService.Mode.NORMAL;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class DecommissionTest extends TestBaseImpl
+{
+ @Test
+ public void testDecommission() throws Throwable
+ {
+ try (Cluster cluster = init(Cluster.build(2)
+ .withConfig(config ->
config.with(GOSSIP)
+
.with(NETWORK))
+
.withInstanceInitializer(DecommissionTest.BB::install)
+ .start()))
+ {
+ IInvokableInstance instance = cluster.get(1);
+
+ instance.runOnInstance(() -> {
+
+ assertEquals(COMPLETED.name(),
StorageService.instance.getBootstrapState());
+
+ // pretend that decommissioning has failed in the middle
+
+ try
+ {
+ StorageService.instance.decommission(true);
+ fail("the first attempt to decommission should fail");
+ }
+ catch (Throwable t)
+ {
+ assertEquals("simulated error in
prepareUnbootstrapStreaming", t.getMessage());
+ }
+
+ assertFalse(StorageService.instance.isDecommissioning());
+ assertTrue(StorageService.instance.isDecommissionFailed());
+
+ // still COMPLETED, nothing has changed
+ assertEquals(COMPLETED.name(),
StorageService.instance.getBootstrapState());
+
+ String operationMode =
StorageService.instance.getOperationMode();
+ assertEquals(DECOMMISSION_FAILED.name(), operationMode);
+
+ // try to decommission again, now successfully
+
+ try
+ {
+ StorageService.instance.decommission(true);
+
+ // decommission was successful, so we reset failed
decommission mode
+
assertFalse(StorageService.instance.isDecommissionFailed());
+
+ assertEquals(DECOMMISSIONED.name(),
StorageService.instance.getBootstrapState());
+ assertFalse(StorageService.instance.isDecommissioning());
+ }
+ catch (Throwable t)
+ {
+ fail("the second decommission attempt should pass but it
failed on: " + t.getMessage());
+ }
+
+ // check that decommissioning of already decommissioned node
has no effect
+
+ try
+ {
+ assertEquals(DECOMMISSIONED.name(),
StorageService.instance.getBootstrapState());
+
assertFalse(StorageService.instance.isDecommissionFailed());
+
+ StorageService.instance.decommission(true);
+
+ assertEquals(DECOMMISSIONED.name(),
StorageService.instance.getBootstrapState());
+
assertFalse(StorageService.instance.isDecommissionFailed());
+ assertFalse(StorageService.instance.isDecommissioning());
+ }
+ catch (Throwable t)
+ {
+ fail("Decommissioning already decommissioned node should
be no-op operation.");
+ }
+ });
+ }
+ }
+
+ @Test
+ public void testDecommissionAfterNodeRestart() throws Throwable
+ {
+ try (Cluster cluster = init(Cluster.build(2)
+ .withConfig(config ->
config.with(GOSSIP)
+
.with(NETWORK))
+
.withInstanceInitializer((classLoader, threadGroup, num, generation) -> {
+ // we do not want to install BB
after restart of a node which
+ // failed to decommission which
is the second generation, here
+ // as "1" as it is counted from
0.
+ if (num == 1 && generation != 1)
+ BB.install(classLoader,
num);
+ })
+ .start()))
+ {
+ IInvokableInstance instance = cluster.get(1);
+
+ instance.runOnInstance(() -> {
+ assertEquals(COMPLETED.name(),
StorageService.instance.getBootstrapState());
+
+ // pretend that decommissioning has failed in the middle
+
+ try
+ {
+ StorageService.instance.decommission(true);
+ fail("the first attempt to decommission should fail");
+ }
+ catch (Throwable t)
+ {
+ assertEquals("simulated error in
prepareUnbootstrapStreaming", t.getMessage());
+ }
+
+ // node is in DECOMMISSION_FAILED mode
+ String operationMode =
StorageService.instance.getOperationMode();
+ assertEquals(DECOMMISSION_FAILED.name(), operationMode);
+ });
+
+ // restart the node which we failed to decommission
+ stopUnchecked(instance);
+ instance.startup();
+
+ // it is back to normal so let's decommission again
+
+ String oprationMode = instance.callOnInstance(() ->
StorageService.instance.getOperationMode());
+ assertEquals(NORMAL.name(), oprationMode);
+
+ instance.runOnInstance(() -> {
+ try
+ {
+ StorageService.instance.decommission(true);
+ }
+ catch (InterruptedException e)
+ {
+ fail("Should decommission the node");
+ }
+
+ assertEquals(DECOMMISSIONED.name(),
StorageService.instance.getBootstrapState());
+ assertFalse(StorageService.instance.isDecommissionFailed());
+ assertFalse(StorageService.instance.isDecommissioning());
+ });
+ }
+ }
+
+
+ public static class BB
+ {
+ public static void install(ClassLoader classLoader, Integer num)
+ {
+ new ByteBuddy().rebase(StorageService.class)
+ .method(named("prepareUnbootstrapStreaming"))
+
.intercept(MethodDelegation.to(DecommissionTest.BB.class))
+ .make()
+ .load(classLoader,
ClassLoadingStrategy.Default.INJECTION);
+ }
+
+ private static int invocations = 0;
+
+ @SuppressWarnings("unused")
+ public static Supplier<Future<StreamState>>
prepareUnbootstrapStreaming(@SuperCall Callable<Supplier<Future<StreamState>>>
zuper)
+ {
+ ++invocations;
+
+ if (invocations == 1)
+ throw new RuntimeException("simulated error in
prepareUnbootstrapStreaming");
+
+ try
+ {
+ return zuper.call();
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]