This is an automated email from the ASF dual-hosted git repository.
paulo pushed a commit to branch cassandra-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-4.0 by this push:
new 8bb9c72f58 Add safeguard so cleanup fails when node has pending ranges
8bb9c72f58 is described below
commit 8bb9c72f582de6bcc39522ba9ade91fd5bc22f67
Author: lzurovchak1 <[email protected]>
AuthorDate: Wed Dec 21 13:34:18 2022 -0500
Add safeguard so cleanup fails when node has pending ranges
Patch by Lindsey Zurovchak; Reviewed by Paulo Motta, Stefan Miklosovic for
CASSANDRA-16418
Closes #2061
---
CHANGES.txt | 1 +
.../cassandra/db/compaction/CompactionManager.java | 6 +-
.../apache/cassandra/service/StorageService.java | 4 +
.../cassandra/distributed/action/GossipHelper.java | 12 +++
.../distributed/test/ring/CleanupFailureTest.java | 111 +++++++++++++++++++++
test/unit/org/apache/cassandra/db/CleanupTest.java | 6 +-
6 files changed, 133 insertions(+), 7 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index a75ee17128..789d8f7fa8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0.8
+ * Add safeguard so cleanup fails when node has pending ranges
(CASSANDRA-16418)
* Fix legacy clustering serialization for paging with compact storage
(CASSANDRA-17507)
* Add support for python 3.11 (CASSANDRA-18088)
* Fix formatting of duration in cqlsh (CASSANDRA-18141)
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 799eed3d0d..a5277356de 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -515,11 +515,7 @@ public class CompactionManager implements
CompactionManagerMBean
{
assert !cfStore.isIndex();
Keyspace keyspace = cfStore.keyspace;
- if (!StorageService.instance.isJoined())
- {
- logger.info("Cleanup cannot run before a node has joined the
ring");
- return AllSSTableOpStatus.ABORTED;
- }
+
// if local ranges is empty, it means no data should remain
final RangesAtEndpoint replicas =
StorageService.instance.getLocalReplicas(keyspace.getName());
final Set<Range<Token>> allRanges = replicas.ranges();
diff --git a/src/java/org/apache/cassandra/service/StorageService.java
b/src/java/org/apache/cassandra/service/StorageService.java
index b70347a301..343f435f11 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -132,6 +132,7 @@ import static
org.apache.cassandra.index.SecondaryIndexManager.isIndexColumnFami
import static org.apache.cassandra.net.NoPayload.noPayload;
import static org.apache.cassandra.net.Verb.REPLICATION_DONE_REQ;
import static
org.apache.cassandra.schema.MigrationManager.evolveSystemKeyspace;
+import static
org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
/**
* This abstraction contains the token/identifier of this node
@@ -3634,6 +3635,9 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
if (SchemaConstants.isLocalSystemKeyspace(keyspaceName))
throw new RuntimeException("Cleanup of the system keyspace is
neither necessary nor wise");
+ if (tokenMetadata.getPendingRanges(keyspaceName,
getBroadcastAddressAndPort()).size() > 0)
+ throw new RuntimeException("Node is involved in cluster membership
changes. Not safe to run cleanup.");
+
CompactionManager.AllSSTableOpStatus status =
CompactionManager.AllSSTableOpStatus.SUCCESSFUL;
for (ColumnFamilyStore cfStore : getValidColumnFamilies(false, false,
keyspaceName, tables))
{
diff --git
a/test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java
b/test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java
index f4b5838a86..4f0d343f7b 100644
--- a/test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java
+++ b/test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java
@@ -66,6 +66,18 @@ public class GossipHelper
};
}
+ public static InstanceAction statusToDecommission(IInvokableInstance
newNode)
+ {
+ return (instance) ->
+ {
+ changeGossipState(instance,
+ newNode,
+ Arrays.asList(tokens(newNode),
+ statusLeaving(newNode),
+ statusWithPortLeaving(newNode)));
+ };
+ }
+
public static InstanceAction statusToNormal(IInvokableInstance peer)
{
return (target) ->
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/ring/CleanupFailureTest.java
b/test/distributed/org/apache/cassandra/distributed/test/ring/CleanupFailureTest.java
new file mode 100644
index 0000000000..9c8f71fde9
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/ring/CleanupFailureTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.ring;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.NodeToolResult;
+import org.apache.cassandra.distributed.shared.NetworkTopology;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+
+import static org.junit.Assert.assertEquals;
+import static
org.apache.cassandra.distributed.action.GossipHelper.statusToBootstrap;
+import static
org.apache.cassandra.distributed.action.GossipHelper.statusToDecommission;
+import static
org.apache.cassandra.distributed.action.GossipHelper.withProperty;
+import static
org.apache.cassandra.distributed.test.ring.BootstrapTest.populate;
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static
org.apache.cassandra.distributed.api.TokenSupplier.evenlyDistributedTokens;
+
+public class CleanupFailureTest extends TestBaseImpl
+{
+ @Test
+ public void cleanupDuringDecommissionTest() throws Throwable
+ {
+ try (Cluster cluster = builder().withNodes(2)
+
.withTokenSupplier(evenlyDistributedTokens(2))
+
.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(2, "dc0", "rack0"))
+ .withConfig(config ->
config.with(NETWORK, GOSSIP))
+ .start())
+ {
+ IInvokableInstance nodeToDecommission = cluster.get(1);
+ IInvokableInstance nodeToRemainInCluster = cluster.get(2);
+
+ // Start decomission on nodeToDecommission
+ cluster.forEach(statusToDecommission(nodeToDecommission));
+
+ // Add data to cluster while node is decomissioning
+ int NUM_ROWS = 100;
+ populate(cluster, 0, NUM_ROWS, 1, 1, ConsistencyLevel.ONE);
+ cluster.forEach(c -> c.flush(KEYSPACE));
+
+ // Check data before cleanup on nodeToRemainInCluster
+ assertEquals(100, nodeToRemainInCluster.executeInternal("SELECT *
FROM " + KEYSPACE + ".tbl").length);
+
+ // Run cleanup on nodeToRemainInCluster
+ NodeToolResult result =
nodeToRemainInCluster.nodetoolResult("cleanup");
+ result.asserts().failure();
+ result.asserts().stderrContains("Node is involved in cluster
membership changes. Not safe to run cleanup.");
+
+ // Check data after cleanup on nodeToRemainInCluster
+ assertEquals(100, nodeToRemainInCluster.executeInternal("SELECT *
FROM " + KEYSPACE + ".tbl").length);
+ }
+ }
+
+ @Test
+ public void cleanupDuringBootstrapTest() throws Throwable
+ {
+ int originalNodeCount = 1;
+ int expandedNodeCount = originalNodeCount + 1;
+
+ try (Cluster cluster = builder().withNodes(originalNodeCount)
+
.withTokenSupplier(evenlyDistributedTokens(expandedNodeCount))
+
.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount,
"dc0", "rack0"))
+ .withConfig(config ->
config.with(NETWORK, GOSSIP))
+ .start())
+ {
+ IInstanceConfig config = cluster.newInstanceConfig();
+ IInvokableInstance bootstrappingNode = cluster.bootstrap(config);
+ withProperty("cassandra.join_ring", false,
+ () -> bootstrappingNode.startup(cluster));
+
+ // Start decomission on bootstrappingNode
+ cluster.forEach(statusToBootstrap(bootstrappingNode));
+
+ // Add data to cluster while node is bootstrapping
+ int NUM_ROWS = 100;
+ populate(cluster, 0, NUM_ROWS, 1, 2, ConsistencyLevel.ONE);
+ cluster.forEach(c -> c.flush(KEYSPACE));
+
+ // Check data before cleanup on bootstrappingNode
+ assertEquals(NUM_ROWS, bootstrappingNode.executeInternal("SELECT *
FROM " + KEYSPACE + ".tbl").length);
+
+ // Run cleanup on bootstrappingNode
+ NodeToolResult result =
bootstrappingNode.nodetoolResult("cleanup");
+ result.asserts().stderrContains("Node is involved in cluster
membership changes. Not safe to run cleanup.");
+
+ // Check data after cleanup on bootstrappingNode
+ assertEquals(NUM_ROWS, bootstrappingNode.executeInternal("SELECT *
FROM " + KEYSPACE + ".tbl").length);
+ }
+ }
+}
\ No newline at end of file
diff --git a/test/unit/org/apache/cassandra/db/CleanupTest.java
b/test/unit/org/apache/cassandra/db/CleanupTest.java
index 6bb6433db1..f59bdd6e7f 100644
--- a/test/unit/org/apache/cassandra/db/CleanupTest.java
+++ b/test/unit/org/apache/cassandra/db/CleanupTest.java
@@ -116,9 +116,11 @@ public class CleanupTest
}
@Test
- public void testCleanup() throws ExecutionException, InterruptedException
+ public void testCleanup() throws ExecutionException, InterruptedException,
UnknownHostException
{
- StorageService.instance.getTokenMetadata().clearUnsafe();
+ TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+ tmd.clearUnsafe();
+ tmd.updateNormalToken(token(new byte[]{ 50 }),
InetAddressAndPort.getByName("127.0.0.1"));
Keyspace keyspace = Keyspace.open(KEYSPACE1);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]