This is an automated email from the ASF dual-hosted git repository.
paulo pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7fe688b000 CASSANDRA-21024: Add configuration to disk usage guardrails
to stop writes across all replicas of a keyspace when any node replicating that
keyspace exceeds the disk usage failure threshold.
7fe688b000 is described below
commit 7fe688b00096319453afcc5c3da3331816c64072
Author: ireath <[email protected]>
AuthorDate: Thu Nov 13 19:46:11 2025 -0500
CASSANDRA-21024: Add configuration to disk usage guardrails to stop writes
across all replicas of a keyspace when any node replicating that keyspace
exceeds the disk usage failure threshold.
This commit adds a new configuration,
data_disk_usage_keyspace_wide_protection_enabled, which ensures that if any
node which replicates a given keyspace is full, all writes to that keyspace are
blocked.
patch by Isaac Reath; reviewed by Stefan Miklosovic, Paulo Motta for
CASSANDRA-20124
Closes #4547
---
CHANGES.txt | 1 +
conf/cassandra.yaml | 4 +
conf/cassandra_latest.yaml | 4 +
src/java/org/apache/cassandra/config/Config.java | 1 +
.../apache/cassandra/config/GuardrailsOptions.java | 14 +
.../cql3/statements/ModificationStatement.java | 31 +-
.../apache/cassandra/db/guardrails/Guardrails.java | 25 ++
.../cassandra/db/guardrails/GuardrailsMBean.java | 11 +
.../service/disk/usage/DiskUsageBroadcaster.java | 140 ++++++++-
.../org/apache/cassandra/utils/NoSpamLogger.java | 10 +
...ailDataDiskUsageKeyspaceWideProtectionTest.java | 324 +++++++++++++++++++++
.../nodetool/GuardrailsConfigCommandsTest.java | 37 +--
12 files changed, 579 insertions(+), 23 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 3adaa9be91..c2df8be752 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
5.1
+ * Add configuration to disk usage guardrails to stop writes across all
replicas of a keyspace when any node replicating that keyspace exceeds the disk
usage failure threshold. (CASSANDRA-21024)
* BETWEEN where token(Y) > token(Z) returns wrong answer (CASSANDRA-20154)
* Optimize memtable flush logic (CASSANDRA-21083)
* No need to evict already prepared statements, as it creates a race
condition between multiple threads (CASSANDRA-17401)
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index b207ff289b..5f3a935be3 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -2517,6 +2517,10 @@ drop_compact_storage_enabled: false
# Min unit: B
# data_disk_usage_max_disk_size:
#
+# Configures the disk usage guardrails to block all writes to a keyspace if
any node which replicates that keyspace
+# is full. By default, this is disabled.
+# data_disk_usage_keyspace_wide_protection_enabled: false
+#
# Guardrail to warn or fail when the minimum replication factor is lesser than
threshold.
# This would also apply to system keyspaces.
# Suggested value for use in production: 2 or higher
diff --git a/conf/cassandra_latest.yaml b/conf/cassandra_latest.yaml
index 2e5c8fee6b..17638deadb 100644
--- a/conf/cassandra_latest.yaml
+++ b/conf/cassandra_latest.yaml
@@ -2295,6 +2295,10 @@ drop_compact_storage_enabled: false
# Min unit: B
# data_disk_usage_max_disk_size:
#
+# Configures the disk usage guardrails to block all writes to a keyspace if
any node which replicates that keyspace
+# is full. By default, this is disabled.
+# data_disk_usage_keyspace_wide_protection_enabled: false
+#
# Guardrail to warn or fail when the minimum replication factor is lesser than
threshold.
# This would also apply to system keyspaces.
# Suggested value for use in production: 2 or higher
diff --git a/src/java/org/apache/cassandra/config/Config.java
b/src/java/org/apache/cassandra/config/Config.java
index 193c77c8f4..2f290d8182 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -985,6 +985,7 @@ public class Config
public volatile int data_disk_usage_percentage_warn_threshold = -1;
public volatile int data_disk_usage_percentage_fail_threshold = -1;
public volatile DataStorageSpec.LongBytesBound
data_disk_usage_max_disk_size = null;
+ public volatile boolean data_disk_usage_keyspace_wide_protection_enabled =
false;
public volatile int minimum_replication_factor_warn_threshold = -1;
public volatile int minimum_replication_factor_fail_threshold = -1;
public volatile int maximum_replication_factor_warn_threshold = -1;
diff --git a/src/java/org/apache/cassandra/config/GuardrailsOptions.java
b/src/java/org/apache/cassandra/config/GuardrailsOptions.java
index d10a2490ef..7443565d89 100644
--- a/src/java/org/apache/cassandra/config/GuardrailsOptions.java
+++ b/src/java/org/apache/cassandra/config/GuardrailsOptions.java
@@ -989,6 +989,20 @@ public class GuardrailsOptions implements GuardrailsConfig
x ->
config.data_disk_usage_percentage_fail_threshold = x);
}
+
+ public boolean getDataDiskUsageKeyspaceWideProtectionEnabled()
+ {
+ return config.data_disk_usage_keyspace_wide_protection_enabled;
+ }
+
+ public void setDataDiskUsageKeyspaceWideProtectionEnabled(boolean enabled)
+ {
+
updatePropertyWithLogging("data_disk_usage_keyspace_wide_protection_enabled",
+ enabled,
+ () ->
config.data_disk_usage_keyspace_wide_protection_enabled,
+ x ->
config.data_disk_usage_keyspace_wide_protection_enabled = x);
+ }
+
@Override
public DataStorageSpec.LongBytesBound getDataDiskUsageMaxDiskSize()
{
diff --git
a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 6f37effc5c..118f2c1fa4 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -103,6 +103,7 @@ import
org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.exceptions.RequestExecutionException;
import org.apache.cassandra.exceptions.RequestValidationException;
import org.apache.cassandra.exceptions.UnauthorizedException;
+import org.apache.cassandra.locator.NetworkTopologyStrategy;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.locator.ReplicaLayout;
import org.apache.cassandra.metrics.ClientRequestSizeMetrics;
@@ -124,6 +125,7 @@ import
org.apache.cassandra.service.disk.usage.DiskUsageBroadcaster;
import org.apache.cassandra.service.paxos.Ballot;
import org.apache.cassandra.service.paxos.BallotGenerator;
import org.apache.cassandra.service.paxos.Commit.Proposal;
+import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.transport.Dispatcher;
import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.triggers.TriggerExecutor;
@@ -412,15 +414,36 @@ public abstract class ModificationStatement implements
CQLStatement.SingleKeyspa
public void validateDiskUsage(QueryOptions options, ClientState state)
{
- // reject writes if any replica exceeds disk usage failure limit or
warn if it exceeds warn limit
- if (Guardrails.replicaDiskUsage.enabled(state) &&
DiskUsageBroadcaster.instance.hasStuffedOrFullNode())
+
+ if (Guardrails.diskUsageKeyspaceWideProtection.enabled(state) &&
+
Guardrails.instance.getDataDiskUsageKeyspaceWideProtectionEnabled() &&
+ DiskUsageBroadcaster.instance.hasStuffedOrFullNode())
+ {
+ Keyspace keyspace = Keyspace.open(keyspace());
+ // If the keyspace is using NetworkTopologyStrategy then we can
check each datacenter on which
+ // the keyspace is replicated.
+ if (keyspace.getMetadata().replicationStrategy instanceof
NetworkTopologyStrategy)
+ {
+ for (String datacenter : ((NetworkTopologyStrategy)
keyspace.getMetadata().replicationStrategy).getDatacenters())
+ {
+
Guardrails.diskUsageKeyspaceWideProtection.guard(datacenter, state);
+ }
+ }
+ // Otherwise, if we are using SimpleStrategy then we have to check
if any datacenter contains a full node.
+ else
+ {
+ for (String datacenter :
ClusterMetadata.current().directory.knownDatacenters())
+ {
+
Guardrails.diskUsageKeyspaceWideProtection.guard(datacenter, state);
+ }
+ }
+ }
+ else if (Guardrails.replicaDiskUsage.enabled(state) &&
DiskUsageBroadcaster.instance.hasStuffedOrFullNode())
{
Keyspace keyspace = Keyspace.open(keyspace());
-
for (ByteBuffer key : buildPartitionKeyNames(options, state))
{
Token token = metadata().partitioner.getToken(key);
-
for (Replica replica :
ReplicaLayout.forTokenWriteLiveAndDown(keyspace, token).all())
{
Guardrails.replicaDiskUsage.guard(replica.endpoint(),
state);
diff --git a/src/java/org/apache/cassandra/db/guardrails/Guardrails.java
b/src/java/org/apache/cassandra/db/guardrails/Guardrails.java
index 7de915190f..6738d9a7d9 100644
--- a/src/java/org/apache/cassandra/db/guardrails/Guardrails.java
+++ b/src/java/org/apache/cassandra/db/guardrails/Guardrails.java
@@ -556,6 +556,19 @@ public final class Guardrails implements GuardrailsMBean
(isWarning, value) ->
isWarning ? "Replica disk usage exceeds warning threshold"
: "Write request failed because disk usage
exceeds failure threshold");
+ /**
+ * Guardrail on the data disk usage of replicas across a datacenter which
replicates a given keyspace.
+ * This is used at write time to verify the status of any node which might
replicate a given keyspace.
+ */
+ public static final Predicates<String> diskUsageKeyspaceWideProtection =
+ new Predicates<>("disk_usage_keyspace_wide_protection",
+ null,
+ state ->
DiskUsageBroadcaster.instance::isDatacenterStuffed,
+ state -> DiskUsageBroadcaster.instance::isDatacenterFull,
+ (isWarning, value) ->
+ isWarning ? "Disk usage in keyspace datacenter exceeds
warning threshold"
+ : "Write request failed because disk usage
exceeds failure threshold in keyspace datacenter.");
+
/**
* Guardrail on passwords for CREATE / ALTER ROLE statements.
*/
@@ -1597,6 +1610,18 @@ public final class Guardrails implements GuardrailsMBean
DEFAULT_CONFIG.setDataDiskUsageMaxDiskSize(sizeFromString(size));
}
+ @Override
+ public boolean getDataDiskUsageKeyspaceWideProtectionEnabled()
+ {
+ return DEFAULT_CONFIG.getDataDiskUsageKeyspaceWideProtectionEnabled();
+ }
+
+ @Override
+ public void setDataDiskUsageKeyspaceWideProtectionEnabled(boolean enabled)
+ {
+ DEFAULT_CONFIG.setDataDiskUsageKeyspaceWideProtectionEnabled(enabled);
+ }
+
@Override
public int getMinimumReplicationFactorWarnThreshold()
{
diff --git a/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java
b/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java
index 108b721fa2..e2e9cd74dd 100644
--- a/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java
+++ b/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java
@@ -874,6 +874,17 @@ public interface GuardrailsMBean
@Nullable
String getDataDiskUsageMaxDiskSize();
+ /**
+ * @return Return whether a single node replicating a given keyspace being
full should block writes for the
+ * entire keyspace. Returns true if this behavior is set, false otherwise.
+ */
+ boolean getDataDiskUsageKeyspaceWideProtectionEnabled();
+
+ /**
+ * @param enabled Enables or disables blocking writes for a keyspace if a
node replicating that keyspace is full.
+ */
+ void setDataDiskUsageKeyspaceWideProtectionEnabled(boolean enabled);
+
/**
* @param size The max disk size of the data directories when calculating
disk usage thresholds, as a string
* formatted as in, for example, {@code 10GiB}, {@code 20MiB},
{@code 30KiB} or {@code 40B}.
diff --git
a/src/java/org/apache/cassandra/service/disk/usage/DiskUsageBroadcaster.java
b/src/java/org/apache/cassandra/service/disk/usage/DiskUsageBroadcaster.java
index 9f638e3313..ceb8265318 100644
--- a/src/java/org/apache/cassandra/service/disk/usage/DiskUsageBroadcaster.java
+++ b/src/java/org/apache/cassandra/service/disk/usage/DiskUsageBroadcaster.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.service.disk.usage;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
@@ -27,13 +28,16 @@ import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
import org.apache.cassandra.gms.VersionedValue;
import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Locator;
import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.tcm.membership.Location;
import org.apache.cassandra.utils.NoSpamLogger;
/**
@@ -50,6 +54,8 @@ public class DiskUsageBroadcaster implements
IEndpointStateChangeSubscriber
private final DiskUsageMonitor monitor;
private final ConcurrentMap<InetAddressAndPort, DiskUsageState> usageInfo
= new ConcurrentHashMap<>();
private volatile boolean hasStuffedOrFullNode = false;
+ private final ConcurrentMap<String, Set<InetAddressAndPort>>
fullNodesByDatacenter = new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, Set<InetAddressAndPort>>
stuffedNodesByDatacenter = new ConcurrentHashMap<>();
@VisibleForTesting
public DiskUsageBroadcaster(DiskUsageMonitor monitor)
@@ -83,6 +89,34 @@ public class DiskUsageBroadcaster implements
IEndpointStateChangeSubscriber
return state(endpoint).isStuffed();
}
+ /**
+ * @return {@code true} if there exists any node in the datacenter of
{@code endpoint} which has FULL disk usage.
+ */
+ @VisibleForTesting
+ public boolean isDatacenterFull(String datacenter)
+ {
+ if (!hasStuffedOrFullNode())
+ {
+ return false;
+ }
+ Set<InetAddressAndPort> fullNodes =
fullNodesByDatacenter.get(datacenter);
+ return fullNodes != null && !fullNodes.isEmpty();
+ }
+
+ /**
+ * @return {@code true} if there exists any node in the datacenter of
{@code endpoint} which has FULL disk usage
+ */
+ @VisibleForTesting
+ public boolean isDatacenterStuffed(String datacenter)
+ {
+ if (!hasStuffedOrFullNode())
+ {
+ return false;
+ }
+ Set<InetAddressAndPort> stuffedNodes =
stuffedNodesByDatacenter.get(datacenter);
+ return stuffedNodes != null && !stuffedNodes.isEmpty();
+ }
+
@VisibleForTesting
public DiskUsageState state(InetAddressAndPort endpoint)
{
@@ -114,8 +148,9 @@ public class DiskUsageBroadcaster implements
IEndpointStateChangeSubscriber
noSpamLogger.warn(String.format("Found unknown DiskUsageState: %s.
Using default state %s instead.",
value.value, usageState));
}
- usageInfo.put(endpoint, usageState);
+ computeUsageStateForEpDatacenter(endpoint, usageState);
+ usageInfo.put(endpoint, usageState);
hasStuffedOrFullNode = usageState.isStuffedOrFull() ||
computeHasStuffedOrFullNode();
}
@@ -131,6 +166,85 @@ public class DiskUsageBroadcaster implements
IEndpointStateChangeSubscriber
return false;
}
+ /**
+ * Update the set of full nodes by datacenter based on the disk usage
state for the given endpoint.
+ * If the node is FULL, add it to the set for its datacenter. Otherwise,
remove it from the set.
+ * This method is idempotent - adding an already-present node or removing
an absent node has no effect.
+ *
+ * @param endpoint The endpoint whose state has changed.
+ * @param usageState The new disk usage state value.
+ */
+ private void computeUsageStateForEpDatacenter(InetAddressAndPort endpoint,
DiskUsageState usageState)
+ {
+ Location location = location(endpoint);
+ if (location.equals(Location.UNKNOWN))
+ {
+ noSpamLogger.warn("Unable to track disk usage by datacenter for
endpoint {} because we are unable to determine its location.",
+ endpoint);
+ return;
+ }
+
+ String datacenter = location.datacenter;
+ if (usageState.isFull())
+ {
+ // Add this node to the set of full nodes for its datacenter and
remove it from the stuffed nodes
+ // if it was there.
+ fullNodesByDatacenter.computeIfAbsent(datacenter, dc ->
ConcurrentHashMap.newKeySet())
+ .add(endpoint);
+ noSpamLogger.debug("Endpoint {} is FULL, added to full nodes set
for datacenter {}", endpoint, datacenter);
+ Set<InetAddressAndPort> stuffedNodes =
stuffedNodesByDatacenter.get(datacenter);
+ if (stuffedNodes != null && stuffedNodes.remove(endpoint))
+ {
+ noSpamLogger.debug("Endpoint {} is now FULL. Removed it from
the stuffed nodes set for datacenter {}",
+ endpoint, datacenter);
+ }
+ }
+ else if (usageState.isStuffed())
+ {
+ // Add this node to the set of stuffed nodes for its datacenter
and remove it from the full nodes
+ // if it was there.
+ stuffedNodesByDatacenter.computeIfAbsent(datacenter, dc ->
ConcurrentHashMap.newKeySet())
+ .add(endpoint);
+ noSpamLogger.debug("Endpoint {} is now STUFFED. Added it to the
stuffed nodes set for datacenter {}",
+ endpoint, datacenter);
+ Set<InetAddressAndPort> fullNodes =
fullNodesByDatacenter.get(datacenter);
+ if (fullNodes != null && fullNodes.remove(endpoint))
+ {
+ noSpamLogger.debug("Endpoint {} is now STUFFED. Removed it
from full nodes set for datacenter {}",
+ endpoint, datacenter);
+ }
+ }
+ else
+ {
+ // Remove this node from the set of full nodes and set of stuffed
nodes for its datacenter if it was there.
+ Set<InetAddressAndPort> fullNodes =
fullNodesByDatacenter.get(datacenter);
+ if (fullNodes != null && fullNodes.remove(endpoint))
+ {
+ noSpamLogger.debug("Endpoint {} is no longer STUFFED or FULL,
removed from stuffed for datacenter {}",
+ endpoint, datacenter);
+ }
+ Set<InetAddressAndPort> stuffedNodes =
stuffedNodesByDatacenter.get(datacenter);
+ if (stuffedNodes != null && stuffedNodes.remove(endpoint))
+ {
+ noSpamLogger.debug("Endpoint {} is no longer STUFFED, removed
from the stuffed set for datacenter {}",
+ endpoint, datacenter);
+ }
+ }
+ }
+
+ private Location location(InetAddressAndPort endpoint)
+ {
+ Locator locator = DatabaseDescriptor.getLocator();
+ if (locator == null)
+ {
+ noSpamLogger.warn("Unable to track disk usage by datacenter for
endpoint {} because locator is null",
+ endpoint);
+ return Location.UNKNOWN;
+ }
+ Location location = locator.location(endpoint);
+ return location != null ? location : Location.UNKNOWN;
+ }
+
@Override
public void onJoin(InetAddressAndPort endpoint, EndpointState epState)
{
@@ -164,10 +278,34 @@ public class DiskUsageBroadcaster implements
IEndpointStateChangeSubscriber
@Override
public void onRemove(InetAddressAndPort endpoint)
{
+ updateDiskUsageStateForDatacenterOnRemoval(endpoint);
usageInfo.remove(endpoint);
hasStuffedOrFullNode =
usageInfo.values().stream().anyMatch(DiskUsageState::isStuffedOrFull);
}
+ private void updateDiskUsageStateForDatacenterOnRemoval(InetAddressAndPort
endpoint)
+ {
+ Location nodeLocation = location(endpoint);
+ if (nodeLocation.equals(Location.UNKNOWN))
+ {
+ logger.debug("Unable to determine location for removed endpoint
{}. Will not update datacenter tracking.", endpoint);
+ return;
+ }
+
+ String datacenter = nodeLocation.datacenter;
+ // Remove the endpoint from the full nodes and stuffed nodes set for
its datacenter
+ Set<InetAddressAndPort> fullNodes =
fullNodesByDatacenter.get(datacenter);
+ if (fullNodes != null && fullNodes.remove(endpoint))
+ {
+ logger.debug("Removed endpoint {} from full nodes set for
datacenter {} on node removal", endpoint, datacenter);
+ }
+ Set<InetAddressAndPort> stuffedNodes =
stuffedNodesByDatacenter.get(datacenter);
+ if (stuffedNodes != null && stuffedNodes.remove(endpoint))
+ {
+ logger.debug("Removed endpoint {} from stuffed nodes set for
datacenter {} on node removal", endpoint, datacenter);
+ }
+ }
+
private void updateDiskUsage(InetAddressAndPort endpoint, EndpointState
state)
{
VersionedValue localValue =
state.getApplicationState(ApplicationState.DISK_USAGE);
diff --git a/src/java/org/apache/cassandra/utils/NoSpamLogger.java
b/src/java/org/apache/cassandra/utils/NoSpamLogger.java
index 41ffeec50d..a2bf815b30 100644
--- a/src/java/org/apache/cassandra/utils/NoSpamLogger.java
+++ b/src/java/org/apache/cassandra/utils/NoSpamLogger.java
@@ -235,6 +235,16 @@ public class NoSpamLogger
return new NoSpamLogger(wrapped, minInterval, timeUnit);
}
+ public boolean debug(long nowNanos, String s, Object... objects)
+ {
+ return NoSpamLogger.this.log( Level.DEBUG, s, nowNanos, objects);
+ }
+
+ public boolean debug(String s, Object... objects)
+ {
+ return NoSpamLogger.this.debug(CLOCK.nanoTime(), s, objects);
+ }
+
public boolean info(long nowNanos, String s, Object... objects)
{
return NoSpamLogger.this.log( Level.INFO, s, nowNanos, objects);
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/guardrails/GuardrailDataDiskUsageKeyspaceWideProtectionTest.java
b/test/distributed/org/apache/cassandra/distributed/test/guardrails/GuardrailDataDiskUsageKeyspaceWideProtectionTest.java
new file mode 100644
index 0000000000..e8f9de648d
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/guardrails/GuardrailDataDiskUsageKeyspaceWideProtectionTest.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.guardrails;
+
+import java.io.IOException;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.InvalidQueryException;
+
+import org.assertj.core.api.Assertions;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.db.guardrails.Guardrails;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.shared.ClusterUtils;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.distributed.util.Auth;
+import org.apache.cassandra.service.disk.usage.DiskUsageBroadcaster;
+import org.apache.cassandra.service.disk.usage.DiskUsageState;
+
+import static
org.apache.cassandra.config.CassandraRelevantProperties.BOOTSTRAP_SKIP_SCHEMA_CHECK;
+import static
org.apache.cassandra.distributed.shared.ClusterUtils.awaitRingJoin;
+import static
org.apache.cassandra.distributed.shared.ClusterUtils.replaceHostAndStart;
+
+public class GuardrailDataDiskUsageKeyspaceWideProtectionTest extends
TestBaseImpl
+{
+ public static final int NODE_TO_MARK_AS_FULL = 2;
+ private static final int NUM_ROWS = 100;
+ private static final String NTS_KEYSPACE_NAME = "nts_keyspace1";
+ private static Cluster cluster;
+ private static com.datastax.driver.core.Cluster driverCluster;
+ private static Session driverSession;
+
+ @Before
+ public void setupCluster() throws IOException
+ {
+ // speed up the task that calculates and propagates the disk usage info
+ CassandraRelevantProperties.DISK_USAGE_MONITOR_INTERVAL_MS.setInt(100);
+ TokenSupplier even = TokenSupplier.evenlyDistributedTokens(2);
+ // build a 2-node cluster with RF=1
+ cluster = init(Cluster.build(2)
+
.withInstanceInitializer(GuardrailDiskUsageTest.DiskStateInjection::install)
+ .withConfig(c -> c.with(Feature.GOSSIP,
Feature.NETWORK, Feature.NATIVE_PROTOCOL)
+
.set("data_disk_usage_max_disk_size", "10GiB")
+
.set("data_disk_usage_percentage_warn_threshold", 98)
+
.set("data_disk_usage_percentage_fail_threshold", 99)
+
.set("data_disk_usage_keyspace_wide_protection_enabled", true)
+ .set("authenticator",
"PasswordAuthenticator")
+
.set("initial_location_provider", "SimpleLocationProvider"))
+ .withTokenSupplier(node -> even.token(node == 3
? 2 : node))
+ .start(), 1);
+
+ Auth.waitForExistingRoles(cluster.get(1));
+
+ // create a regular user, since the default superuser is excluded from
guardrails
+ com.datastax.driver.core.Cluster.Builder builder =
com.datastax.driver.core.Cluster.builder().addContactPoint("127.0.0.1");
+ try (com.datastax.driver.core.Cluster c =
builder.withCredentials("cassandra", "cassandra").build();
+ Session session = c.connect())
+ {
+ session.execute("CREATE USER test WITH PASSWORD 'test'");
+ session.execute("CREATE KEYSPACE " + NTS_KEYSPACE_NAME + " WITH
REPLICATION={'class': 'NetworkTopologyStrategy', 'datacenter1': 1}");
+ }
+
+ // connect using that superuser, we use the driver to get access to
the client warnings
+ driverCluster = builder.withCredentials("test", "test").build();
+ driverSession = driverCluster.connect();
+ }
+
+ @After
+ public void cleanup() throws IOException
+ {
+ if (driverSession != null)
+ driverSession.close();
+
+ if (driverCluster != null)
+ driverCluster.close();
+
+ if (cluster != null)
+ cluster.close();
+ }
+
+ @Test
+ public void testDiskUsageWithStopWritesForKeyspaceOnFail() throws Throwable
+ {
+ String tableName = KEYSPACE + ".guardrail_disk_usage_tbl";
+ testDataDiskUsageKeyspaceWideProtectionGuardrailCommon(tableName);
+ }
+
+ @Test
+ public void
testDataDiskUsageKeyspaceWideProtectionGuardrailDatacenterFullAndNetworkTopologyStrategyUsedShouldBlockWrites()
+ {
+ String tableName = NTS_KEYSPACE_NAME + ".guardrail_disk_usage_tbl";
+ testDataDiskUsageKeyspaceWideProtectionGuardrailCommon(tableName);
+ }
+
+ private static void
testDataDiskUsageKeyspaceWideProtectionGuardrailCommon(String tableName)
+ {
+ cluster.schemaChange(String.format("CREATE TABLE %s (k int PRIMARY
KEY, v int)", tableName));
+ String insert = String.format("INSERT INTO %s(k, v) VALUES (?, 0)",
tableName);
+ ensureGuardrailCommon(insert, NODE_TO_MARK_AS_FULL);
+
+ // Finally, if both nodes go back to SPACIOUS, all queries will
succeed again
+ GuardrailDiskUsageTest.DiskStateInjection.setState(cluster,
NODE_TO_MARK_AS_FULL, DiskUsageState.SPACIOUS);
+ for (int i = 0; i < NUM_ROWS; i++)
+ {
+ ResultSet rs = driverSession.execute(insert, i);
+
Assertions.assertThat(rs.getExecutionInfo().getWarnings()).isEmpty();
+ }
+ }
+
+
+
+ @Test
+ public void
testDiskUsageWithStopWriteForKeyspaceWhenFullNodeReplaceWithSpaciousShouldNotBlock()
throws Throwable
+ {
+ String tableName = KEYSPACE + ".guardrail_disk_usage_tbl";
+ cluster.schemaChange(String.format("CREATE TABLE %s (k int PRIMARY
KEY, v int)", tableName));
+ String insert = String.format("INSERT INTO %s(k, v) VALUES (?, 0)",
tableName);
+
+ ensureGuardrailCommon(insert, NODE_TO_MARK_AS_FULL);
+ // When we replace the node with a SPACIOUS node, then we should
succeed again.
+ IInvokableInstance nodeToRemove = cluster.get(NODE_TO_MARK_AS_FULL);
+ ClusterUtils.stopUnchecked(nodeToRemove);
+ IInvokableInstance replacingNode = replaceHostAndStart(cluster,
nodeToRemove, props -> {
+ // since we have a downed host there might be a schema version
which is old show up but
+ // can't be fetched since the host is down...
+ props.set(BOOTSTRAP_SKIP_SCHEMA_CHECK, true);
+ });
+ awaitRingJoin(cluster.get(1), replacingNode);
+ awaitRingJoin(replacingNode, cluster.get(1));
+ for (int i = 0; i < NUM_ROWS; i++)
+ {
+ ResultSet rs = driverSession.execute(insert, i);
+
Assertions.assertThat(rs.getExecutionInfo().getWarnings()).isEmpty();
+ }
+
+ }
+
+ @Test
+ public void
testDiskUsageWithStopWriteForKeyspaceWhenFullNodeIpChangeAndBecomesSpaciousShouldNotBlock()
throws Throwable
+ {
+ String tableName = KEYSPACE + ".guardrail_disk_usage_tbl";
+ cluster.schemaChange(String.format("CREATE TABLE %s (k int PRIMARY
KEY, v int)", tableName));
+ String insert = String.format("INSERT INTO %s(k, v) VALUES (?, 0)",
tableName);
+ ensureGuardrailCommon(insert, NODE_TO_MARK_AS_FULL);
+
+ // If the node goes offline then comes online with a different IP,
then we should still fail.
+ IInvokableInstance nodeToChangeIp = cluster.get(NODE_TO_MARK_AS_FULL);
+ ClusterUtils.stopUnchecked(nodeToChangeIp);
+ ClusterUtils.updateAddress(nodeToChangeIp, "127.0.0.4");
+ nodeToChangeIp.startup();
+ GuardrailDiskUsageTest.DiskStateInjection.setState(cluster,
NODE_TO_MARK_AS_FULL, DiskUsageState.FULL);
+ int numFailures = 0;
+ for (int i = 0; i < NUM_ROWS; i++)
+ {
+ try
+ {
+ driverSession.execute(insert, i);
+ Assertions.fail("Should have failed");
+ }
+ catch (InvalidQueryException e)
+ {
+ numFailures++;
+ }
+ }
+ Assertions.assertThat(numFailures).isEqualTo(NUM_ROWS);
+
+ // If the node then becomes SPACIOUS then we should succeed again.
+ GuardrailDiskUsageTest.DiskStateInjection.setState(cluster,
NODE_TO_MARK_AS_FULL, DiskUsageState.SPACIOUS);
+ for (int i = 0; i < NUM_ROWS; i++)
+ {
+ ResultSet rs = driverSession.execute(insert, i);
+
Assertions.assertThat(rs.getExecutionInfo().getWarnings()).isEmpty();
+ }
+ }
+
+
+ @Test
+ public void
testDiskUsageWithStopWritesForKeyspaceOnFailWhenFullNodeLeavesShouldStopBlocking()
+ {
+ String tableName = KEYSPACE + ".guardrail_disk_usage_tbl";
+ cluster.schemaChange(String.format("CREATE TABLE %s (k int PRIMARY
KEY, v int)", tableName));
+ String insert = String.format("INSERT INTO %s(k, v) VALUES (?, 0)",
tableName);
+ ensureGuardrailCommon(insert, NODE_TO_MARK_AS_FULL);
+
+ // If the FULL node leaves the cluster, then writes should succeed
again.
+ IInvokableInstance nodeToRemove = cluster.get(NODE_TO_MARK_AS_FULL);
+ ClusterUtils.decommission(nodeToRemove);
+ for (int i = 0; i < NUM_ROWS; i++)
+ {
+ ResultSet rs = driverSession.execute(insert, i);
+
Assertions.assertThat(rs.getExecutionInfo().getWarnings()).isEmpty();
+ }
+ }
+
+ @Test
+ public void
testDiskUsageWithStopWritesForKeyspaceWhenFlagDisabledShouldNotBlockEntireKeyspace()
+ {
+ String tableName = KEYSPACE + ".guardrail_disk_usage_tbl";
+ cluster.schemaChange(String.format("CREATE TABLE %s (k int PRIMARY
KEY, v int)", tableName));
+ String insert = String.format("INSERT INTO %s(k, v) VALUES (?, 0)",
tableName);
+ ensureGuardrailCommon(insert, NODE_TO_MARK_AS_FULL);
+
+ // Disable the stopWritesForKeyspacesOnFail flag.
+ // Writes which were destined for the FULL node should fail, but
others should succeed.
+ cluster.get(1).runOnInstance(() ->
Guardrails.instance.setDataDiskUsageKeyspaceWideProtectionEnabled(false));
+ int numFailures = 0;
+ for (int i = 0; i < NUM_ROWS; i++)
+ {
+ try
+ {
+ driverSession.execute(insert, i);
+ }
+ catch (InvalidQueryException e)
+ {
+ numFailures++;
+ }
+ }
+ Assertions.assertThat(numFailures).isBetween(1, NUM_ROWS - 1);
+ }
+
+ @Test
+ public void
testDiskUsageWithStopWritesForKeyspaceWhenFlagToggledAndNodeLeavesShouldReEnableWrites()
+ {
+ String tableName = KEYSPACE + ".guardrail_disk_usage_tbl";
+ cluster.schemaChange(String.format("CREATE TABLE %s (k int PRIMARY
KEY, v int)", tableName));
+ String insert = String.format("INSERT INTO %s(k, v) VALUES (?, 0)",
tableName);
+ ensureGuardrailCommon(insert, NODE_TO_MARK_AS_FULL);
+ cluster.get(1).runOnInstance(() ->
Guardrails.instance.setDataDiskUsageKeyspaceWideProtectionEnabled(false));
+ // If the FULL node leaves the cluster, then writes should succeed
again.
+ IInvokableInstance nodeToRemove = cluster.get(NODE_TO_MARK_AS_FULL);
+ ClusterUtils.decommission(nodeToRemove);
+ for (int i = 0; i < NUM_ROWS; i++)
+ {
+ ResultSet rs = driverSession.execute(insert, i);
+
Assertions.assertThat(rs.getExecutionInfo().getWarnings()).isEmpty();
+ }
+ cluster.get(1).runOnInstance(() ->
Guardrails.instance.setDataDiskUsageKeyspaceWideProtectionEnabled(true));
+ for (int i = 0; i < NUM_ROWS; i++)
+ {
+ ResultSet rs = driverSession.execute(insert, i);
+
Assertions.assertThat(rs.getExecutionInfo().getWarnings()).isEmpty();
+ }
+ cluster.get(1).runOnInstance(() ->
Assertions.assertThat(DiskUsageBroadcaster.instance.isDatacenterFull("datacenter1")).isFalse());
+ cluster.get(1).runOnInstance(() ->
Assertions.assertThat(DiskUsageBroadcaster.instance.isDatacenterStuffed("datacenter1")).isFalse());
+ }
+
+ @Test
+ public void
testDiskUsageWithStopWritesForKeyspaceWhenFlagToggledAndNodeBecomesSpaciousShouldReEnableWrites()
+ {
+ String tableName = KEYSPACE + ".guardrail_disk_usage_tbl";
+ cluster.schemaChange(String.format("CREATE TABLE %s (k int PRIMARY
KEY, v int)", tableName));
+ String insert = String.format("INSERT INTO %s(k, v) VALUES (?, 0)",
tableName);
+ ensureGuardrailCommon(insert, NODE_TO_MARK_AS_FULL);
+ cluster.get(1).runOnInstance(() ->
Guardrails.instance.setDataDiskUsageKeyspaceWideProtectionEnabled(false));
+ GuardrailDiskUsageTest.DiskStateInjection.setState(cluster,
NODE_TO_MARK_AS_FULL, DiskUsageState.SPACIOUS);
+ cluster.get(1).runOnInstance(() ->
Guardrails.instance.setDataDiskUsageKeyspaceWideProtectionEnabled(true));
+ for (int i = 0; i < NUM_ROWS; i++)
+ {
+ ResultSet rs = driverSession.execute(insert, i);
+
Assertions.assertThat(rs.getExecutionInfo().getWarnings()).isEmpty();
+ }
+ cluster.get(1).runOnInstance(() ->
Assertions.assertThat(DiskUsageBroadcaster.instance.isDatacenterFull("datacenter1")).isFalse());
+ cluster.get(1).runOnInstance(() ->
Assertions.assertThat(DiskUsageBroadcaster.instance.isDatacenterStuffed("datacenter1")).isFalse());
+ }
+
+ /**
+ * Ensures that the guardrail works in the common scenario across all
tests (i.e., with both nodes SPACIOUS we
+ * succeed and with one FULL we fail).
+ *
+ * @param insert The insert statement which we will use to test the
guardrail.
+ * @param node The node which we will mark as full.
+ */
+ private static void ensureGuardrailCommon(String insert, int node)
+ {
+ // With both nodes in SPACIOUS state, we can write without warnings
nor failures
+ for (int i = 0; i < NUM_ROWS; i++)
+ {
+ ResultSet rs = driverSession.execute(insert, i);
+
Assertions.assertThat(rs.getExecutionInfo().getWarnings()).isEmpty();
+ }
+
+ // If the STUFFED node becomes full, but the
data_disk_usage_keyspace_wide_protection_enabled is set,
+ // then all writes will fail regardless of node.
+ GuardrailDiskUsageTest.DiskStateInjection.setState(cluster, node,
DiskUsageState.FULL);
+ int numFailures = 0;
+ for (int i = 0; i < NUM_ROWS; i++)
+ {
+ try
+ {
+ driverSession.execute(insert, i);
+ Assertions.fail("Should have failed");
+ }
+ catch (InvalidQueryException e)
+ {
+ numFailures++;
+ }
+ }
+ Assertions.assertThat(numFailures).isEqualTo(NUM_ROWS);
+ }
+}
diff --git
a/test/unit/org/apache/cassandra/tools/nodetool/GuardrailsConfigCommandsTest.java
b/test/unit/org/apache/cassandra/tools/nodetool/GuardrailsConfigCommandsTest.java
index d88949b473..37958178b7 100644
---
a/test/unit/org/apache/cassandra/tools/nodetool/GuardrailsConfigCommandsTest.java
+++
b/test/unit/org/apache/cassandra/tools/nodetool/GuardrailsConfigCommandsTest.java
@@ -198,24 +198,25 @@ public class GuardrailsConfigCommandsTest extends
CQLTester
private static final String ALL_FLAGS_GETTER_OUTPUT =
- "allow_filtering_enabled true\n" +
- "alter_table_enabled true\n" +
- "bulk_load_enabled true\n" +
- "compact_tables_enabled true\n" +
- "drop_keyspace_enabled true\n" +
- "drop_truncate_table_enabled true\n" +
- "group_by_enabled true\n" +
- "intersect_filtering_query_enabled true\n" +
- "intersect_filtering_query_warned true\n" +
- "non_partition_restricted_index_query_enabled true\n" +
- "read_before_write_list_operations_enabled true\n" +
- "secondary_indexes_enabled true\n" +
- "simplestrategy_enabled true\n" +
- "uncompressed_tables_enabled true\n" +
- "user_timestamps_enabled true\n" +
- "vector_type_enabled true\n" +
- "zero_ttl_on_twcs_enabled true\n" +
- "zero_ttl_on_twcs_warned true\n";
+ "allow_filtering_enabled true\n" +
+ "alter_table_enabled true\n" +
+ "bulk_load_enabled true\n" +
+ "compact_tables_enabled true\n" +
+ "data_disk_usage_keyspace_wide_protection_enabled false\n" +
+ "drop_keyspace_enabled true\n" +
+ "drop_truncate_table_enabled true\n" +
+ "group_by_enabled true\n" +
+ "intersect_filtering_query_enabled true\n" +
+ "intersect_filtering_query_warned true\n" +
+ "non_partition_restricted_index_query_enabled true\n" +
+ "read_before_write_list_operations_enabled true\n" +
+ "secondary_indexes_enabled true\n" +
+ "simplestrategy_enabled true\n" +
+ "uncompressed_tables_enabled true\n" +
+ "user_timestamps_enabled true\n" +
+ "vector_type_enabled true\n" +
+ "zero_ttl_on_twcs_enabled true\n" +
+ "zero_ttl_on_twcs_warned true\n";
private static final String ALL_THRESHOLDS_GETTER_OUTPUT =
"collection_list_size_threshold [null, null] \n" +
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]