This is an automated email from the ASF dual-hosted git repository.

paulo pushed a commit to branch CASSANDRA-21024-trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit b51ea46c44f4c6da143a22ccd233b1bf9b463dd3
Author: ireath <[email protected]>
AuthorDate: Thu Nov 13 19:46:11 2025 -0500

    CASSANDRA-21024: Add configuration to disk usage guardrails to stop writes 
across all replicas of a keyspace when any node replicating that keyspace 
exceeds the disk usage failure threshold.
    
    This commit adds a new configuration, 
data_disk_usage_keyspace_wide_protection_enabled, which ensures that if any 
node which replicates a given keyspace is full, all writes to that keyspace are 
blocked.
    
    patch by Isaac Reath; reviewed by Stefan Miklosovic, Paulo Motta for 
CASSANDRA-20124
    
    Closes #4547
---
 CHANGES.txt                                        |   1 +
 conf/cassandra.yaml                                |   4 +
 conf/cassandra_latest.yaml                         |   4 +
 src/java/org/apache/cassandra/config/Config.java   |   1 +
 .../apache/cassandra/config/GuardrailsOptions.java |  14 +
 .../cql3/statements/ModificationStatement.java     |  31 +-
 .../apache/cassandra/db/guardrails/Guardrails.java |  25 ++
 .../cassandra/db/guardrails/GuardrailsMBean.java   |  11 +
 .../service/disk/usage/DiskUsageBroadcaster.java   | 140 ++++++++-
 .../org/apache/cassandra/utils/NoSpamLogger.java   |  10 +
 ...ailDataDiskUsageKeyspaceWideProtectionTest.java | 324 +++++++++++++++++++++
 .../nodetool/GuardrailsConfigCommandsTest.java     |  37 +--
 12 files changed, 579 insertions(+), 23 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 3adaa9be91..c2df8be752 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.1
+ * Add configuration to disk usage guardrails to stop writes across all 
replicas of a keyspace when any node replicating that keyspace exceeds the disk 
usage failure threshold. (CASSANDRA-21024)
  * BETWEEN where token(Y) > token(Z) returns wrong answer (CASSANDRA-20154)
  * Optimize memtable flush logic (CASSANDRA-21083)
  * No need to evict already prepared statements, as it creates a race 
condition between multiple threads (CASSANDRA-17401)
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index b207ff289b..5f3a935be3 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -2517,6 +2517,10 @@ drop_compact_storage_enabled: false
 # Min unit: B
 # data_disk_usage_max_disk_size:
 #
+# Configures the disk usage guardrails to block all writes to a keyspace if 
any node which replicates that keyspace
+# is full. By default, this is disabled.
+# data_disk_usage_keyspace_wide_protection_enabled: false
+#
 # Guardrail to warn or fail when the minimum replication factor is lesser than 
threshold.
 # This would also apply to system keyspaces.
 # Suggested value for use in production: 2 or higher
diff --git a/conf/cassandra_latest.yaml b/conf/cassandra_latest.yaml
index 2e5c8fee6b..17638deadb 100644
--- a/conf/cassandra_latest.yaml
+++ b/conf/cassandra_latest.yaml
@@ -2295,6 +2295,10 @@ drop_compact_storage_enabled: false
 # Min unit: B
 # data_disk_usage_max_disk_size:
 #
+# Configures the disk usage guardrails to block all writes to a keyspace if 
any node which replicates that keyspace
+# is full. By default, this is disabled.
+# data_disk_usage_keyspace_wide_protection_enabled: false
+#
 # Guardrail to warn or fail when the minimum replication factor is lesser than 
threshold.
 # This would also apply to system keyspaces.
 # Suggested value for use in production: 2 or higher
diff --git a/src/java/org/apache/cassandra/config/Config.java 
b/src/java/org/apache/cassandra/config/Config.java
index 193c77c8f4..2f290d8182 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -985,6 +985,7 @@ public class Config
     public volatile int data_disk_usage_percentage_warn_threshold = -1;
     public volatile int data_disk_usage_percentage_fail_threshold = -1;
     public volatile DataStorageSpec.LongBytesBound 
data_disk_usage_max_disk_size = null;
+    public volatile boolean data_disk_usage_keyspace_wide_protection_enabled = 
false;
     public volatile int minimum_replication_factor_warn_threshold = -1;
     public volatile int minimum_replication_factor_fail_threshold = -1;
     public volatile int maximum_replication_factor_warn_threshold = -1;
diff --git a/src/java/org/apache/cassandra/config/GuardrailsOptions.java 
b/src/java/org/apache/cassandra/config/GuardrailsOptions.java
index d10a2490ef..7443565d89 100644
--- a/src/java/org/apache/cassandra/config/GuardrailsOptions.java
+++ b/src/java/org/apache/cassandra/config/GuardrailsOptions.java
@@ -989,6 +989,20 @@ public class GuardrailsOptions implements GuardrailsConfig
                                   x -> 
config.data_disk_usage_percentage_fail_threshold = x);
     }
 
+
+    public boolean getDataDiskUsageKeyspaceWideProtectionEnabled()
+    {
+        return config.data_disk_usage_keyspace_wide_protection_enabled;
+    }
+
+    public void setDataDiskUsageKeyspaceWideProtectionEnabled(boolean enabled)
+    {
+        
updatePropertyWithLogging("data_disk_usage_keyspace_wide_protection_enabled",
+                                  enabled,
+                                  () -> 
config.data_disk_usage_keyspace_wide_protection_enabled,
+                                  x -> 
config.data_disk_usage_keyspace_wide_protection_enabled = x);
+    }
+
     @Override
     public DataStorageSpec.LongBytesBound getDataDiskUsageMaxDiskSize()
     {
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 6f37effc5c..118f2c1fa4 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -103,6 +103,7 @@ import 
org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.RequestExecutionException;
 import org.apache.cassandra.exceptions.RequestValidationException;
 import org.apache.cassandra.exceptions.UnauthorizedException;
+import org.apache.cassandra.locator.NetworkTopologyStrategy;
 import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.locator.ReplicaLayout;
 import org.apache.cassandra.metrics.ClientRequestSizeMetrics;
@@ -124,6 +125,7 @@ import 
org.apache.cassandra.service.disk.usage.DiskUsageBroadcaster;
 import org.apache.cassandra.service.paxos.Ballot;
 import org.apache.cassandra.service.paxos.BallotGenerator;
 import org.apache.cassandra.service.paxos.Commit.Proposal;
+import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.transport.Dispatcher;
 import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.triggers.TriggerExecutor;
@@ -412,15 +414,36 @@ public abstract class ModificationStatement implements 
CQLStatement.SingleKeyspa
 
     public void validateDiskUsage(QueryOptions options, ClientState state)
     {
-        // reject writes if any replica exceeds disk usage failure limit or 
warn if it exceeds warn limit
-        if (Guardrails.replicaDiskUsage.enabled(state) && 
DiskUsageBroadcaster.instance.hasStuffedOrFullNode())
+
+        if (Guardrails.diskUsageKeyspaceWideProtection.enabled(state) &&
+            
Guardrails.instance.getDataDiskUsageKeyspaceWideProtectionEnabled() &&
+            DiskUsageBroadcaster.instance.hasStuffedOrFullNode())
+        {
+            Keyspace keyspace = Keyspace.open(keyspace());
+            // If the keyspace is using NetworkTopologyStrategy then we can 
check each datacenter on which
+            // the keyspace is replicated.
+            if (keyspace.getMetadata().replicationStrategy instanceof 
NetworkTopologyStrategy)
+            {
+                for (String datacenter : ((NetworkTopologyStrategy) 
keyspace.getMetadata().replicationStrategy).getDatacenters())
+                {
+                    
Guardrails.diskUsageKeyspaceWideProtection.guard(datacenter, state);
+                }
+            }
+            // Otherwise, if we are using SimpleStrategy then we have to check 
if any datacenter contains a full node.
+            else
+            {
+                for (String datacenter : 
ClusterMetadata.current().directory.knownDatacenters())
+                {
+                    
Guardrails.diskUsageKeyspaceWideProtection.guard(datacenter, state);
+                }
+            }
+        }
+        else if (Guardrails.replicaDiskUsage.enabled(state) && 
DiskUsageBroadcaster.instance.hasStuffedOrFullNode())
         {
             Keyspace keyspace = Keyspace.open(keyspace());
-
             for (ByteBuffer key : buildPartitionKeyNames(options, state))
             {
                 Token token = metadata().partitioner.getToken(key);
-
                 for (Replica replica : 
ReplicaLayout.forTokenWriteLiveAndDown(keyspace, token).all())
                 {
                     Guardrails.replicaDiskUsage.guard(replica.endpoint(), 
state);
diff --git a/src/java/org/apache/cassandra/db/guardrails/Guardrails.java 
b/src/java/org/apache/cassandra/db/guardrails/Guardrails.java
index 7de915190f..6738d9a7d9 100644
--- a/src/java/org/apache/cassandra/db/guardrails/Guardrails.java
+++ b/src/java/org/apache/cassandra/db/guardrails/Guardrails.java
@@ -556,6 +556,19 @@ public final class Guardrails implements GuardrailsMBean
                      (isWarning, value) ->
                      isWarning ? "Replica disk usage exceeds warning threshold"
                                : "Write request failed because disk usage 
exceeds failure threshold");
+    /**
+     * Guardrail on the data disk usage of replicas across a datacenter which 
replicates a given keyspace.
+     * This is used at write time to verify the status of any node which might 
replicate a given keyspace.
+     */
+    public static final Predicates<String> diskUsageKeyspaceWideProtection =
+    new Predicates<>("disk_usage_keyspace_wide_protection",
+                     null,
+                     state -> 
DiskUsageBroadcaster.instance::isDatacenterStuffed,
+                     state -> DiskUsageBroadcaster.instance::isDatacenterFull,
+                     (isWarning, value) ->
+                     isWarning ? "Disk usage in keyspace datacenter exceeds 
warning threshold"
+                               : "Write request failed because disk usage 
exceeds failure threshold in keyspace datacenter.");
+
     /**
      * Guardrail on passwords for CREATE / ALTER ROLE statements.
      */
@@ -1597,6 +1610,18 @@ public final class Guardrails implements GuardrailsMBean
         DEFAULT_CONFIG.setDataDiskUsageMaxDiskSize(sizeFromString(size));
     }
 
+    @Override
+    public boolean getDataDiskUsageKeyspaceWideProtectionEnabled()
+    {
+        return DEFAULT_CONFIG.getDataDiskUsageKeyspaceWideProtectionEnabled();
+    }
+
+    @Override
+    public void setDataDiskUsageKeyspaceWideProtectionEnabled(boolean enabled)
+    {
+        DEFAULT_CONFIG.setDataDiskUsageKeyspaceWideProtectionEnabled(enabled);
+    }
+
     @Override
     public int getMinimumReplicationFactorWarnThreshold()
     {
diff --git a/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java 
b/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java
index 108b721fa2..e2e9cd74dd 100644
--- a/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java
+++ b/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java
@@ -874,6 +874,17 @@ public interface GuardrailsMBean
     @Nullable
     String getDataDiskUsageMaxDiskSize();
 
+    /**
+     * @return Return whether a single node replicating a given keyspace being 
full should block writes for the
+     * entire keyspace. Returns true if this behavior is set, false otherwise.
+     */
+    boolean getDataDiskUsageKeyspaceWideProtectionEnabled();
+
+    /**
+     * @param enabled Enables or disables blocking writes for a keyspace if a 
node replicating that keyspace is full.
+     */
+    void setDataDiskUsageKeyspaceWideProtectionEnabled(boolean enabled);
+
     /**
      * @param size The max disk size of the data directories when calculating 
disk usage thresholds, as a string
      *             formatted as in, for example, {@code 10GiB}, {@code 20MiB}, 
{@code 30KiB} or {@code 40B}.
diff --git 
a/src/java/org/apache/cassandra/service/disk/usage/DiskUsageBroadcaster.java 
b/src/java/org/apache/cassandra/service/disk/usage/DiskUsageBroadcaster.java
index 9f638e3313..ceb8265318 100644
--- a/src/java/org/apache/cassandra/service/disk/usage/DiskUsageBroadcaster.java
+++ b/src/java/org/apache/cassandra/service/disk/usage/DiskUsageBroadcaster.java
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.service.disk.usage;
 
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
@@ -27,13 +28,16 @@ import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.EndpointState;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
 import org.apache.cassandra.gms.VersionedValue;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Locator;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.tcm.membership.Location;
 import org.apache.cassandra.utils.NoSpamLogger;
 
 /**
@@ -50,6 +54,8 @@ public class DiskUsageBroadcaster implements 
IEndpointStateChangeSubscriber
     private final DiskUsageMonitor monitor;
     private final ConcurrentMap<InetAddressAndPort, DiskUsageState> usageInfo 
= new ConcurrentHashMap<>();
     private volatile boolean hasStuffedOrFullNode = false;
+    private final ConcurrentMap<String, Set<InetAddressAndPort>> 
fullNodesByDatacenter = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, Set<InetAddressAndPort>> 
stuffedNodesByDatacenter = new ConcurrentHashMap<>();
 
     @VisibleForTesting
     public DiskUsageBroadcaster(DiskUsageMonitor monitor)
@@ -83,6 +89,34 @@ public class DiskUsageBroadcaster implements 
IEndpointStateChangeSubscriber
         return state(endpoint).isStuffed();
     }
 
+    /**
+     * @return {@code true} if there exists any node in the datacenter of 
{@code endpoint} which has FULL disk usage.
+     */
+    @VisibleForTesting
+    public boolean isDatacenterFull(String datacenter)
+    {
+        if (!hasStuffedOrFullNode())
+        {
+            return false;
+        }
+        Set<InetAddressAndPort> fullNodes = 
fullNodesByDatacenter.get(datacenter);
+        return fullNodes != null && !fullNodes.isEmpty();
+    }
+
+    /**
+     * @return {@code true} if there exists any node in the datacenter of 
{@code endpoint} which has FULL disk usage
+     */
+    @VisibleForTesting
+    public boolean isDatacenterStuffed(String datacenter)
+    {
+        if (!hasStuffedOrFullNode())
+        {
+            return false;
+        }
+        Set<InetAddressAndPort> stuffedNodes = 
stuffedNodesByDatacenter.get(datacenter);
+        return stuffedNodes != null && !stuffedNodes.isEmpty();
+    }
+
     @VisibleForTesting
     public DiskUsageState state(InetAddressAndPort endpoint)
     {
@@ -114,8 +148,9 @@ public class DiskUsageBroadcaster implements 
IEndpointStateChangeSubscriber
             noSpamLogger.warn(String.format("Found unknown DiskUsageState: %s. 
Using default state %s instead.",
                                             value.value, usageState));
         }
-        usageInfo.put(endpoint, usageState);
 
+        computeUsageStateForEpDatacenter(endpoint, usageState);
+        usageInfo.put(endpoint, usageState);
         hasStuffedOrFullNode = usageState.isStuffedOrFull() || 
computeHasStuffedOrFullNode();
     }
 
@@ -131,6 +166,85 @@ public class DiskUsageBroadcaster implements 
IEndpointStateChangeSubscriber
         return false;
     }
 
+    /**
+     * Update the set of full nodes by datacenter based on the disk usage 
state for the given endpoint.
+     * If the node is FULL, add it to the set for its datacenter. Otherwise, 
remove it from the set.
+     * This method is idempotent - adding an already-present node or removing 
an absent node has no effect.
+     *
+     * @param endpoint   The endpoint whose state has changed.
+     * @param usageState The new disk usage state value.
+     */
+    private void computeUsageStateForEpDatacenter(InetAddressAndPort endpoint, 
DiskUsageState usageState)
+    {
+        Location location = location(endpoint);
+        if (location.equals(Location.UNKNOWN))
+        {
+            noSpamLogger.warn("Unable to track disk usage by datacenter for 
endpoint {} because we are unable to determine its location.",
+                         endpoint);
+            return;
+        }
+
+        String datacenter = location.datacenter;
+        if (usageState.isFull())
+        {
+            // Add this node to the set of full nodes for its datacenter and 
remove it from the stuffed nodes
+            // if it was there.
+            fullNodesByDatacenter.computeIfAbsent(datacenter, dc -> 
ConcurrentHashMap.newKeySet())
+                                 .add(endpoint);
+            noSpamLogger.debug("Endpoint {} is FULL, added to full nodes set 
for datacenter {}", endpoint, datacenter);
+            Set<InetAddressAndPort> stuffedNodes = 
stuffedNodesByDatacenter.get(datacenter);
+            if (stuffedNodes != null && stuffedNodes.remove(endpoint))
+            {
+                noSpamLogger.debug("Endpoint {} is now FULL. Removed it from 
the stuffed nodes set for datacenter {}",
+                             endpoint, datacenter);
+            }
+        }
+        else if (usageState.isStuffed())
+        {
+            // Add this node to the set of stuffed nodes for its datacenter 
and remove it from the full nodes
+            // if it was there.
+            stuffedNodesByDatacenter.computeIfAbsent(datacenter, dc -> 
ConcurrentHashMap.newKeySet())
+                                    .add(endpoint);
+            noSpamLogger.debug("Endpoint {} is now STUFFED. Added it to the 
stuffed nodes set for datacenter {}",
+                         endpoint, datacenter);
+            Set<InetAddressAndPort> fullNodes = 
fullNodesByDatacenter.get(datacenter);
+            if (fullNodes != null && fullNodes.remove(endpoint))
+            {
+                noSpamLogger.debug("Endpoint {} is now STUFFED. Removed it 
from full nodes set for datacenter {}",
+                             endpoint, datacenter);
+            }
+        }
+        else
+        {
+            // Remove this node from the set of full nodes and set of stuffed 
nodes for its datacenter if it was there.
+            Set<InetAddressAndPort> fullNodes = 
fullNodesByDatacenter.get(datacenter);
+            if (fullNodes != null && fullNodes.remove(endpoint))
+            {
+                noSpamLogger.debug("Endpoint {} is no longer STUFFED or FULL, 
removed from stuffed for datacenter {}",
+                             endpoint, datacenter);
+            }
+            Set<InetAddressAndPort> stuffedNodes = 
stuffedNodesByDatacenter.get(datacenter);
+            if (stuffedNodes != null && stuffedNodes.remove(endpoint))
+            {
+                noSpamLogger.debug("Endpoint {} is no longer STUFFED, removed 
from the stuffed set for datacenter {}",
+                             endpoint, datacenter);
+            }
+        }
+    }
+
+    private Location location(InetAddressAndPort endpoint)
+    {
+        Locator locator = DatabaseDescriptor.getLocator();
+        if (locator == null)
+        {
+            noSpamLogger.warn("Unable to track disk usage by datacenter for 
endpoint {} because locator is null",
+                              endpoint);
+            return Location.UNKNOWN;
+        }
+        Location location = locator.location(endpoint);
+        return location != null ? location : Location.UNKNOWN;
+    }
+
     @Override
     public void onJoin(InetAddressAndPort endpoint, EndpointState epState)
     {
@@ -164,10 +278,34 @@ public class DiskUsageBroadcaster implements 
IEndpointStateChangeSubscriber
     @Override
     public void onRemove(InetAddressAndPort endpoint)
     {
+        updateDiskUsageStateForDatacenterOnRemoval(endpoint);
         usageInfo.remove(endpoint);
         hasStuffedOrFullNode = 
usageInfo.values().stream().anyMatch(DiskUsageState::isStuffedOrFull);
     }
 
+    private void updateDiskUsageStateForDatacenterOnRemoval(InetAddressAndPort 
endpoint)
+    {
+        Location nodeLocation = location(endpoint);
+        if (nodeLocation.equals(Location.UNKNOWN))
+        {
+            logger.debug("Unable to determine location for removed endpoint 
{}. Will not update datacenter tracking.", endpoint);
+            return;
+        }
+
+        String datacenter = nodeLocation.datacenter;
+        // Remove the endpoint from the full nodes and stuffed nodes set for 
its datacenter
+        Set<InetAddressAndPort> fullNodes = 
fullNodesByDatacenter.get(datacenter);
+        if (fullNodes != null && fullNodes.remove(endpoint))
+        {
+            logger.debug("Removed endpoint {} from full nodes set for 
datacenter {} on node removal", endpoint, datacenter);
+        }
+        Set<InetAddressAndPort> stuffedNodes = 
stuffedNodesByDatacenter.get(datacenter);
+        if (stuffedNodes != null && stuffedNodes.remove(endpoint))
+        {
+            logger.debug("Removed endpoint {} from stuffed nodes set for 
datacenter {} on node removal", endpoint, datacenter);
+        }
+    }
+
     private void updateDiskUsage(InetAddressAndPort endpoint, EndpointState 
state)
     {
         VersionedValue localValue = 
state.getApplicationState(ApplicationState.DISK_USAGE);
diff --git a/src/java/org/apache/cassandra/utils/NoSpamLogger.java 
b/src/java/org/apache/cassandra/utils/NoSpamLogger.java
index 41ffeec50d..a2bf815b30 100644
--- a/src/java/org/apache/cassandra/utils/NoSpamLogger.java
+++ b/src/java/org/apache/cassandra/utils/NoSpamLogger.java
@@ -235,6 +235,16 @@ public class NoSpamLogger
         return new NoSpamLogger(wrapped, minInterval, timeUnit);
     }
 
+    public boolean debug(long nowNanos, String s, Object... objects)
+    {
+        return NoSpamLogger.this.log( Level.DEBUG, s, nowNanos, objects);
+    }
+
+    public boolean debug(String s, Object... objects)
+    {
+        return NoSpamLogger.this.debug(CLOCK.nanoTime(), s, objects);
+    }
+
     public boolean info(long nowNanos, String s, Object... objects)
     {
         return NoSpamLogger.this.log( Level.INFO, s, nowNanos, objects);
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/guardrails/GuardrailDataDiskUsageKeyspaceWideProtectionTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/guardrails/GuardrailDataDiskUsageKeyspaceWideProtectionTest.java
new file mode 100644
index 0000000000..e8f9de648d
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/guardrails/GuardrailDataDiskUsageKeyspaceWideProtectionTest.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.guardrails;
+
+import java.io.IOException;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.InvalidQueryException;
+
+import org.assertj.core.api.Assertions;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.db.guardrails.Guardrails;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.shared.ClusterUtils;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.distributed.util.Auth;
+import org.apache.cassandra.service.disk.usage.DiskUsageBroadcaster;
+import org.apache.cassandra.service.disk.usage.DiskUsageState;
+
+import static 
org.apache.cassandra.config.CassandraRelevantProperties.BOOTSTRAP_SKIP_SCHEMA_CHECK;
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.awaitRingJoin;
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.replaceHostAndStart;
+
+public class GuardrailDataDiskUsageKeyspaceWideProtectionTest extends 
TestBaseImpl
+{
+    public static final int NODE_TO_MARK_AS_FULL = 2;
+    private static final int NUM_ROWS = 100;
+    private static final String NTS_KEYSPACE_NAME = "nts_keyspace1";
+    private static Cluster cluster;
+    private static com.datastax.driver.core.Cluster driverCluster;
+    private static Session driverSession;
+
+    @Before
+    public void setupCluster() throws IOException
+    {
+        // speed up the task that calculates and propagates the disk usage info
+        CassandraRelevantProperties.DISK_USAGE_MONITOR_INTERVAL_MS.setInt(100);
+        TokenSupplier even = TokenSupplier.evenlyDistributedTokens(2);
+        // build a 2-node cluster with RF=1
+        cluster = init(Cluster.build(2)
+                              
.withInstanceInitializer(GuardrailDiskUsageTest.DiskStateInjection::install)
+                              .withConfig(c -> c.with(Feature.GOSSIP, 
Feature.NETWORK, Feature.NATIVE_PROTOCOL)
+                                                
.set("data_disk_usage_max_disk_size", "10GiB")
+                                                
.set("data_disk_usage_percentage_warn_threshold", 98)
+                                                
.set("data_disk_usage_percentage_fail_threshold", 99)
+                                                
.set("data_disk_usage_keyspace_wide_protection_enabled", true)
+                                                .set("authenticator", 
"PasswordAuthenticator")
+                                                
.set("initial_location_provider", "SimpleLocationProvider"))
+                              .withTokenSupplier(node -> even.token(node == 3 
? 2 : node))
+                              .start(), 1);
+
+        Auth.waitForExistingRoles(cluster.get(1));
+
+        // create a regular user, since the default superuser is excluded from 
guardrails
+        com.datastax.driver.core.Cluster.Builder builder = 
com.datastax.driver.core.Cluster.builder().addContactPoint("127.0.0.1");
+        try (com.datastax.driver.core.Cluster c = 
builder.withCredentials("cassandra", "cassandra").build();
+             Session session = c.connect())
+        {
+            session.execute("CREATE USER test WITH PASSWORD 'test'");
+            session.execute("CREATE KEYSPACE " + NTS_KEYSPACE_NAME + " WITH 
REPLICATION={'class': 'NetworkTopologyStrategy', 'datacenter1': 1}");
+        }
+
+        // connect using that superuser, we use the driver to get access to 
the client warnings
+        driverCluster = builder.withCredentials("test", "test").build();
+        driverSession = driverCluster.connect();
+    }
+
+    @After
+    public void cleanup() throws IOException
+    {
+        if (driverSession != null)
+            driverSession.close();
+
+        if (driverCluster != null)
+            driverCluster.close();
+
+        if (cluster != null)
+            cluster.close();
+    }
+
+    @Test
+    public void testDiskUsageWithStopWritesForKeyspaceOnFail() throws Throwable
+    {
+        String tableName = KEYSPACE + ".guardrail_disk_usage_tbl";
+        testDataDiskUsageKeyspaceWideProtectionGuardrailCommon(tableName);
+    }
+
+    @Test
+    public void 
testDataDiskUsageKeyspaceWideProtectionGuardrailDatacenterFullAndNetworkTopologyStrategyUsedShouldBlockWrites()
+    {
+        String tableName = NTS_KEYSPACE_NAME + ".guardrail_disk_usage_tbl";
+        testDataDiskUsageKeyspaceWideProtectionGuardrailCommon(tableName);
+    }
+
+    private static void 
testDataDiskUsageKeyspaceWideProtectionGuardrailCommon(String tableName)
+    {
+        cluster.schemaChange(String.format("CREATE TABLE %s (k int PRIMARY 
KEY, v int)", tableName));
+        String insert = String.format("INSERT INTO %s(k, v) VALUES (?, 0)", 
tableName);
+        ensureGuardrailCommon(insert, NODE_TO_MARK_AS_FULL);
+
+        // Finally, if both nodes go back to SPACIOUS, all queries will 
succeed again
+        GuardrailDiskUsageTest.DiskStateInjection.setState(cluster, 
NODE_TO_MARK_AS_FULL, DiskUsageState.SPACIOUS);
+        for (int i = 0; i < NUM_ROWS; i++)
+        {
+            ResultSet rs = driverSession.execute(insert, i);
+            
Assertions.assertThat(rs.getExecutionInfo().getWarnings()).isEmpty();
+        }
+    }
+
+
+
+    @Test
+    public void 
testDiskUsageWithStopWriteForKeyspaceWhenFullNodeReplaceWithSpaciousShouldNotBlock()
 throws Throwable
+    {
+        String tableName = KEYSPACE + ".guardrail_disk_usage_tbl";
+        cluster.schemaChange(String.format("CREATE TABLE %s (k int PRIMARY 
KEY, v int)", tableName));
+        String insert = String.format("INSERT INTO %s(k, v) VALUES (?, 0)", 
tableName);
+
+        ensureGuardrailCommon(insert, NODE_TO_MARK_AS_FULL);
+        // When we replace the node with a SPACIOUS node, then we should 
succeed again.
+        IInvokableInstance nodeToRemove = cluster.get(NODE_TO_MARK_AS_FULL);
+        ClusterUtils.stopUnchecked(nodeToRemove);
+        IInvokableInstance replacingNode = replaceHostAndStart(cluster, 
nodeToRemove, props -> {
+            // since we have a downed host there might be a schema version 
which is old show up but
+            // can't be fetched since the host is down...
+            props.set(BOOTSTRAP_SKIP_SCHEMA_CHECK, true);
+        });
+        awaitRingJoin(cluster.get(1), replacingNode);
+        awaitRingJoin(replacingNode, cluster.get(1));
+        for (int i = 0; i < NUM_ROWS; i++)
+        {
+            ResultSet rs = driverSession.execute(insert, i);
+            
Assertions.assertThat(rs.getExecutionInfo().getWarnings()).isEmpty();
+        }
+
+    }
+
+    @Test
+    public void 
testDiskUsageWithStopWriteForKeyspaceWhenFullNodeIpChangeAndBecomesSpaciousShouldNotBlock()
 throws Throwable
+    {
+        String tableName = KEYSPACE + ".guardrail_disk_usage_tbl";
+        cluster.schemaChange(String.format("CREATE TABLE %s (k int PRIMARY 
KEY, v int)", tableName));
+        String insert = String.format("INSERT INTO %s(k, v) VALUES (?, 0)", 
tableName);
+        ensureGuardrailCommon(insert, NODE_TO_MARK_AS_FULL);
+
+        // If the node goes offline then comes online with a different IP, 
then we should still fail.
+        IInvokableInstance nodeToChangeIp = cluster.get(NODE_TO_MARK_AS_FULL);
+        ClusterUtils.stopUnchecked(nodeToChangeIp);
+        ClusterUtils.updateAddress(nodeToChangeIp, "127.0.0.4");
+        nodeToChangeIp.startup();
+        GuardrailDiskUsageTest.DiskStateInjection.setState(cluster, 
NODE_TO_MARK_AS_FULL, DiskUsageState.FULL);
+        int numFailures = 0;
+        for (int i = 0; i < NUM_ROWS; i++)
+        {
+            try
+            {
+                driverSession.execute(insert, i);
+                Assertions.fail("Should have failed");
+            }
+            catch (InvalidQueryException e)
+            {
+                numFailures++;
+            }
+        }
+        Assertions.assertThat(numFailures).isEqualTo(NUM_ROWS);
+
+        // If the node then becomes SPACIOUS then we should succeed again.
+        GuardrailDiskUsageTest.DiskStateInjection.setState(cluster, 
NODE_TO_MARK_AS_FULL, DiskUsageState.SPACIOUS);
+        for (int i = 0; i < NUM_ROWS; i++)
+        {
+            ResultSet rs = driverSession.execute(insert, i);
+            
Assertions.assertThat(rs.getExecutionInfo().getWarnings()).isEmpty();
+        }
+    }
+
+
+    @Test
+    public void 
testDiskUsageWithStopWritesForKeyspaceOnFailWhenFullNodeLeavesShouldStopBlocking()
+    {
+        String tableName = KEYSPACE + ".guardrail_disk_usage_tbl";
+        cluster.schemaChange(String.format("CREATE TABLE %s (k int PRIMARY 
KEY, v int)", tableName));
+        String insert = String.format("INSERT INTO %s(k, v) VALUES (?, 0)", 
tableName);
+        ensureGuardrailCommon(insert, NODE_TO_MARK_AS_FULL);
+
+        // If the FULL node leaves the cluster, then writes should succeed 
again.
+        IInvokableInstance nodeToRemove = cluster.get(NODE_TO_MARK_AS_FULL);
+        ClusterUtils.decommission(nodeToRemove);
+        for (int i = 0; i < NUM_ROWS; i++)
+        {
+            ResultSet rs = driverSession.execute(insert, i);
+            
Assertions.assertThat(rs.getExecutionInfo().getWarnings()).isEmpty();
+        }
+    }
+
+    @Test
+    public void 
testDiskUsageWithStopWritesForKeyspaceWhenFlagDisabledShouldNotBlockEntireKeyspace()
+    {
+        String tableName = KEYSPACE + ".guardrail_disk_usage_tbl";
+        cluster.schemaChange(String.format("CREATE TABLE %s (k int PRIMARY 
KEY, v int)", tableName));
+        String insert = String.format("INSERT INTO %s(k, v) VALUES (?, 0)", 
tableName);
+        ensureGuardrailCommon(insert, NODE_TO_MARK_AS_FULL);
+
+        // Disable the stopWritesForKeyspacesOnFail flag.
+        // Writes which were destined for the FULL node should fail, but 
others should succeed.
+        cluster.get(1).runOnInstance(() -> 
Guardrails.instance.setDataDiskUsageKeyspaceWideProtectionEnabled(false));
+        int numFailures = 0;
+        for (int i = 0; i < NUM_ROWS; i++)
+        {
+            try
+            {
+                driverSession.execute(insert, i);
+            }
+            catch (InvalidQueryException e)
+            {
+                numFailures++;
+            }
+        }
+        Assertions.assertThat(numFailures).isBetween(1, NUM_ROWS - 1);
+    }
+
+    @Test
+    public void 
testDiskUsageWithStopWritesForKeyspaceWhenFlagToggledAndNodeLeavesShouldReEnableWrites()
+    {
+        String tableName = KEYSPACE + ".guardrail_disk_usage_tbl";
+        cluster.schemaChange(String.format("CREATE TABLE %s (k int PRIMARY 
KEY, v int)", tableName));
+        String insert = String.format("INSERT INTO %s(k, v) VALUES (?, 0)", 
tableName);
+        ensureGuardrailCommon(insert, NODE_TO_MARK_AS_FULL);
+        cluster.get(1).runOnInstance(() -> 
Guardrails.instance.setDataDiskUsageKeyspaceWideProtectionEnabled(false));
+        // If the FULL node leaves the cluster, then writes should succeed 
again.
+        IInvokableInstance nodeToRemove = cluster.get(NODE_TO_MARK_AS_FULL);
+        ClusterUtils.decommission(nodeToRemove);
+        for (int i = 0; i < NUM_ROWS; i++)
+        {
+            ResultSet rs = driverSession.execute(insert, i);
+            
Assertions.assertThat(rs.getExecutionInfo().getWarnings()).isEmpty();
+        }
+        cluster.get(1).runOnInstance(() -> 
Guardrails.instance.setDataDiskUsageKeyspaceWideProtectionEnabled(true));
+        for (int i = 0; i < NUM_ROWS; i++)
+        {
+            ResultSet rs = driverSession.execute(insert, i);
+            
Assertions.assertThat(rs.getExecutionInfo().getWarnings()).isEmpty();
+        }
+        cluster.get(1).runOnInstance(() -> 
Assertions.assertThat(DiskUsageBroadcaster.instance.isDatacenterFull("datacenter1")).isFalse());
+        cluster.get(1).runOnInstance(() -> 
Assertions.assertThat(DiskUsageBroadcaster.instance.isDatacenterStuffed("datacenter1")).isFalse());
+    }
+
+    @Test
+    public void 
testDiskUsageWithStopWritesForKeyspaceWhenFlagToggledAndNodeBecomesSpaciousShouldReEnableWrites()
+    {
+        String tableName = KEYSPACE + ".guardrail_disk_usage_tbl";
+        cluster.schemaChange(String.format("CREATE TABLE %s (k int PRIMARY 
KEY, v int)", tableName));
+        String insert = String.format("INSERT INTO %s(k, v) VALUES (?, 0)", 
tableName);
+        ensureGuardrailCommon(insert, NODE_TO_MARK_AS_FULL);
+        cluster.get(1).runOnInstance(() -> 
Guardrails.instance.setDataDiskUsageKeyspaceWideProtectionEnabled(false));
+        GuardrailDiskUsageTest.DiskStateInjection.setState(cluster, 
NODE_TO_MARK_AS_FULL, DiskUsageState.SPACIOUS);
+        cluster.get(1).runOnInstance(() -> 
Guardrails.instance.setDataDiskUsageKeyspaceWideProtectionEnabled(true));
+        for (int i = 0; i < NUM_ROWS; i++)
+        {
+            ResultSet rs = driverSession.execute(insert, i);
+            
Assertions.assertThat(rs.getExecutionInfo().getWarnings()).isEmpty();
+        }
+        cluster.get(1).runOnInstance(() -> 
Assertions.assertThat(DiskUsageBroadcaster.instance.isDatacenterFull("datacenter1")).isFalse());
+        cluster.get(1).runOnInstance(() -> 
Assertions.assertThat(DiskUsageBroadcaster.instance.isDatacenterStuffed("datacenter1")).isFalse());
+    }
+
+    /**
+     * Ensures that the guardrail works in the common scenario across all 
tests (i.e., with both nodes SPACIOUS we
+     * succeed and with one FULL we fail).
+     *
+     * @param insert The insert statement which we will use to test the 
guardrail.
+     * @param node The node which we will mark as full.
+     */
+    private static void ensureGuardrailCommon(String insert, int node)
+    {
+        // With both nodes in SPACIOUS state, we can write without warnings 
nor failures
+        for (int i = 0; i < NUM_ROWS; i++)
+        {
+            ResultSet rs = driverSession.execute(insert, i);
+            
Assertions.assertThat(rs.getExecutionInfo().getWarnings()).isEmpty();
+        }
+
+        // If the STUFFED node becomes full, but the 
data_disk_usage_keyspace_wide_protection_enabled is set,
+        // then all writes will fail regardless of node.
+        GuardrailDiskUsageTest.DiskStateInjection.setState(cluster, node, 
DiskUsageState.FULL);
+        int numFailures = 0;
+        for (int i = 0; i < NUM_ROWS; i++)
+        {
+            try
+            {
+                driverSession.execute(insert, i);
+                Assertions.fail("Should have failed");
+            }
+            catch (InvalidQueryException e)
+            {
+                numFailures++;
+            }
+        }
+        Assertions.assertThat(numFailures).isEqualTo(NUM_ROWS);
+    }
+}
diff --git 
a/test/unit/org/apache/cassandra/tools/nodetool/GuardrailsConfigCommandsTest.java
 
b/test/unit/org/apache/cassandra/tools/nodetool/GuardrailsConfigCommandsTest.java
index d88949b473..37958178b7 100644
--- 
a/test/unit/org/apache/cassandra/tools/nodetool/GuardrailsConfigCommandsTest.java
+++ 
b/test/unit/org/apache/cassandra/tools/nodetool/GuardrailsConfigCommandsTest.java
@@ -198,24 +198,25 @@ public class GuardrailsConfigCommandsTest extends 
CQLTester
 
 
     private static final String ALL_FLAGS_GETTER_OUTPUT =
-    "allow_filtering_enabled                         true\n" +
-    "alter_table_enabled                             true\n" +
-    "bulk_load_enabled                               true\n" +
-    "compact_tables_enabled                          true\n" +
-    "drop_keyspace_enabled                           true\n" +
-    "drop_truncate_table_enabled                     true\n" +
-    "group_by_enabled                                true\n" +
-    "intersect_filtering_query_enabled               true\n" +
-    "intersect_filtering_query_warned                true\n" +
-    "non_partition_restricted_index_query_enabled    true\n" +
-    "read_before_write_list_operations_enabled       true\n" +
-    "secondary_indexes_enabled                       true\n" +
-    "simplestrategy_enabled                          true\n" +
-    "uncompressed_tables_enabled                     true\n" +
-    "user_timestamps_enabled                         true\n" +
-    "vector_type_enabled                             true\n" +
-    "zero_ttl_on_twcs_enabled                        true\n" +
-    "zero_ttl_on_twcs_warned                         true\n";
+    "allow_filtering_enabled                          true\n"  +
+    "alter_table_enabled                              true\n"  +
+    "bulk_load_enabled                                true\n"  +
+    "compact_tables_enabled                           true\n"  +
+    "data_disk_usage_keyspace_wide_protection_enabled false\n" +
+    "drop_keyspace_enabled                            true\n"  +
+    "drop_truncate_table_enabled                      true\n"  +
+    "group_by_enabled                                 true\n"  +
+    "intersect_filtering_query_enabled                true\n"  +
+    "intersect_filtering_query_warned                 true\n"  +
+    "non_partition_restricted_index_query_enabled     true\n"  +
+    "read_before_write_list_operations_enabled        true\n"  +
+    "secondary_indexes_enabled                        true\n"  +
+    "simplestrategy_enabled                           true\n"  +
+    "uncompressed_tables_enabled                      true\n"  +
+    "user_timestamps_enabled                          true\n"  +
+    "vector_type_enabled                              true\n"  +
+    "zero_ttl_on_twcs_enabled                         true\n"  +
+    "zero_ttl_on_twcs_warned                          true\n";
 
     private static final String ALL_THRESHOLDS_GETTER_OUTPUT =
     "collection_list_size_threshold               [null, null]  \n" +


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to