This is an automated email from the ASF dual-hosted git repository. paulo pushed a commit to branch CASSANDRA-21024-trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit b51ea46c44f4c6da143a22ccd233b1bf9b463dd3 Author: ireath <[email protected]> AuthorDate: Thu Nov 13 19:46:11 2025 -0500 CASSANDRA-21024: Add configuration to disk usage guardrails to stop writes across all replicas of a keyspace when any node replicating that keyspace exceeds the disk usage failure threshold. This commit adds a new configuration, data_disk_usage_keyspace_wide_protection_enabled, which ensures that if any node which replicates a given keyspace is full, all writes to that keyspace are blocked. patch by Isaac Reath; reviewed by Stefan Miklosovic, Paulo Motta for CASSANDRA-20124 Closes #4547 --- CHANGES.txt | 1 + conf/cassandra.yaml | 4 + conf/cassandra_latest.yaml | 4 + src/java/org/apache/cassandra/config/Config.java | 1 + .../apache/cassandra/config/GuardrailsOptions.java | 14 + .../cql3/statements/ModificationStatement.java | 31 +- .../apache/cassandra/db/guardrails/Guardrails.java | 25 ++ .../cassandra/db/guardrails/GuardrailsMBean.java | 11 + .../service/disk/usage/DiskUsageBroadcaster.java | 140 ++++++++- .../org/apache/cassandra/utils/NoSpamLogger.java | 10 + ...ailDataDiskUsageKeyspaceWideProtectionTest.java | 324 +++++++++++++++++++++ .../nodetool/GuardrailsConfigCommandsTest.java | 37 +-- 12 files changed, 579 insertions(+), 23 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 3adaa9be91..c2df8be752 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 5.1 + * Add configuration to disk usage guardrails to stop writes across all replicas of a keyspace when any node replicating that keyspace exceeds the disk usage failure threshold. (CASSANDRA-21024) * BETWEEN where token(Y) > token(Z) returns wrong answer (CASSANDRA-20154) * Optimize memtable flush logic (CASSANDRA-21083) * No need to evict already prepared statements, as it creates a race condition between multiple threads (CASSANDRA-17401) diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index b207ff289b..5f3a935be3 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -2517,6 +2517,10 @@ drop_compact_storage_enabled: false # Min unit: B # data_disk_usage_max_disk_size: # +# Configures the disk usage guardrails to block all writes to a keyspace if any node which replicates that keyspace +# is full. By default, this is disabled. +# data_disk_usage_keyspace_wide_protection_enabled: false +# # Guardrail to warn or fail when the minimum replication factor is lesser than threshold. # This would also apply to system keyspaces. # Suggested value for use in production: 2 or higher diff --git a/conf/cassandra_latest.yaml b/conf/cassandra_latest.yaml index 2e5c8fee6b..17638deadb 100644 --- a/conf/cassandra_latest.yaml +++ b/conf/cassandra_latest.yaml @@ -2295,6 +2295,10 @@ drop_compact_storage_enabled: false # Min unit: B # data_disk_usage_max_disk_size: # +# Configures the disk usage guardrails to block all writes to a keyspace if any node which replicates that keyspace +# is full. By default, this is disabled. +# data_disk_usage_keyspace_wide_protection_enabled: false +# # Guardrail to warn or fail when the minimum replication factor is lesser than threshold. # This would also apply to system keyspaces. # Suggested value for use in production: 2 or higher diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 193c77c8f4..2f290d8182 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -985,6 +985,7 @@ public class Config public volatile int data_disk_usage_percentage_warn_threshold = -1; public volatile int data_disk_usage_percentage_fail_threshold = -1; public volatile DataStorageSpec.LongBytesBound data_disk_usage_max_disk_size = null; + public volatile boolean data_disk_usage_keyspace_wide_protection_enabled = false; public volatile int minimum_replication_factor_warn_threshold = -1; public volatile int minimum_replication_factor_fail_threshold = -1; public volatile int maximum_replication_factor_warn_threshold = -1; diff --git a/src/java/org/apache/cassandra/config/GuardrailsOptions.java b/src/java/org/apache/cassandra/config/GuardrailsOptions.java index d10a2490ef..7443565d89 100644 --- a/src/java/org/apache/cassandra/config/GuardrailsOptions.java +++ b/src/java/org/apache/cassandra/config/GuardrailsOptions.java @@ -989,6 +989,20 @@ public class GuardrailsOptions implements GuardrailsConfig x -> config.data_disk_usage_percentage_fail_threshold = x); } + + public boolean getDataDiskUsageKeyspaceWideProtectionEnabled() + { + return config.data_disk_usage_keyspace_wide_protection_enabled; + } + + public void setDataDiskUsageKeyspaceWideProtectionEnabled(boolean enabled) + { + updatePropertyWithLogging("data_disk_usage_keyspace_wide_protection_enabled", + enabled, + () -> config.data_disk_usage_keyspace_wide_protection_enabled, + x -> config.data_disk_usage_keyspace_wide_protection_enabled = x); + } + @Override public DataStorageSpec.LongBytesBound getDataDiskUsageMaxDiskSize() { diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java index 6f37effc5c..118f2c1fa4 100644 --- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java @@ -103,6 +103,7 @@ import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.exceptions.RequestExecutionException; import org.apache.cassandra.exceptions.RequestValidationException; import org.apache.cassandra.exceptions.UnauthorizedException; +import org.apache.cassandra.locator.NetworkTopologyStrategy; import org.apache.cassandra.locator.Replica; import org.apache.cassandra.locator.ReplicaLayout; import org.apache.cassandra.metrics.ClientRequestSizeMetrics; @@ -124,6 +125,7 @@ import org.apache.cassandra.service.disk.usage.DiskUsageBroadcaster; import org.apache.cassandra.service.paxos.Ballot; import org.apache.cassandra.service.paxos.BallotGenerator; import org.apache.cassandra.service.paxos.Commit.Proposal; +import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.transport.Dispatcher; import org.apache.cassandra.transport.messages.ResultMessage; import org.apache.cassandra.triggers.TriggerExecutor; @@ -412,15 +414,36 @@ public abstract class ModificationStatement implements CQLStatement.SingleKeyspa public void validateDiskUsage(QueryOptions options, ClientState state) { - // reject writes if any replica exceeds disk usage failure limit or warn if it exceeds warn limit - if (Guardrails.replicaDiskUsage.enabled(state) && DiskUsageBroadcaster.instance.hasStuffedOrFullNode()) + + if (Guardrails.diskUsageKeyspaceWideProtection.enabled(state) && + Guardrails.instance.getDataDiskUsageKeyspaceWideProtectionEnabled() && + DiskUsageBroadcaster.instance.hasStuffedOrFullNode()) + { + Keyspace keyspace = Keyspace.open(keyspace()); + // If the keyspace is using NetworkTopologyStrategy then we can check each datacenter on which + // the keyspace is replicated. + if (keyspace.getMetadata().replicationStrategy instanceof NetworkTopologyStrategy) + { + for (String datacenter : ((NetworkTopologyStrategy) keyspace.getMetadata().replicationStrategy).getDatacenters()) + { + Guardrails.diskUsageKeyspaceWideProtection.guard(datacenter, state); + } + } + // Otherwise, if we are using SimpleStrategy then we have to check if any datacenter contains a full node. + else + { + for (String datacenter : ClusterMetadata.current().directory.knownDatacenters()) + { + Guardrails.diskUsageKeyspaceWideProtection.guard(datacenter, state); + } + } + } + else if (Guardrails.replicaDiskUsage.enabled(state) && DiskUsageBroadcaster.instance.hasStuffedOrFullNode()) { Keyspace keyspace = Keyspace.open(keyspace()); - for (ByteBuffer key : buildPartitionKeyNames(options, state)) { Token token = metadata().partitioner.getToken(key); - for (Replica replica : ReplicaLayout.forTokenWriteLiveAndDown(keyspace, token).all()) { Guardrails.replicaDiskUsage.guard(replica.endpoint(), state); diff --git a/src/java/org/apache/cassandra/db/guardrails/Guardrails.java b/src/java/org/apache/cassandra/db/guardrails/Guardrails.java index 7de915190f..6738d9a7d9 100644 --- a/src/java/org/apache/cassandra/db/guardrails/Guardrails.java +++ b/src/java/org/apache/cassandra/db/guardrails/Guardrails.java @@ -556,6 +556,19 @@ public final class Guardrails implements GuardrailsMBean (isWarning, value) -> isWarning ? "Replica disk usage exceeds warning threshold" : "Write request failed because disk usage exceeds failure threshold"); + /** + * Guardrail on the data disk usage of replicas across a datacenter which replicates a given keyspace. + * This is used at write time to verify the status of any node which might replicate a given keyspace. + */ + public static final Predicates<String> diskUsageKeyspaceWideProtection = + new Predicates<>("disk_usage_keyspace_wide_protection", + null, + state -> DiskUsageBroadcaster.instance::isDatacenterStuffed, + state -> DiskUsageBroadcaster.instance::isDatacenterFull, + (isWarning, value) -> + isWarning ? "Disk usage in keyspace datacenter exceeds warning threshold" + : "Write request failed because disk usage exceeds failure threshold in keyspace datacenter."); + /** * Guardrail on passwords for CREATE / ALTER ROLE statements. */ @@ -1597,6 +1610,18 @@ public final class Guardrails implements GuardrailsMBean DEFAULT_CONFIG.setDataDiskUsageMaxDiskSize(sizeFromString(size)); } + @Override + public boolean getDataDiskUsageKeyspaceWideProtectionEnabled() + { + return DEFAULT_CONFIG.getDataDiskUsageKeyspaceWideProtectionEnabled(); + } + + @Override + public void setDataDiskUsageKeyspaceWideProtectionEnabled(boolean enabled) + { + DEFAULT_CONFIG.setDataDiskUsageKeyspaceWideProtectionEnabled(enabled); + } + @Override public int getMinimumReplicationFactorWarnThreshold() { diff --git a/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java b/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java index 108b721fa2..e2e9cd74dd 100644 --- a/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java +++ b/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java @@ -874,6 +874,17 @@ public interface GuardrailsMBean @Nullable String getDataDiskUsageMaxDiskSize(); + /** + * @return Return whether a single node replicating a given keyspace being full should block writes for the + * entire keyspace. Returns true if this behavior is set, false otherwise. + */ + boolean getDataDiskUsageKeyspaceWideProtectionEnabled(); + + /** + * @param enabled Enables or disables blocking writes for a keyspace if a node replicating that keyspace is full. + */ + void setDataDiskUsageKeyspaceWideProtectionEnabled(boolean enabled); + /** * @param size The max disk size of the data directories when calculating disk usage thresholds, as a string * formatted as in, for example, {@code 10GiB}, {@code 20MiB}, {@code 30KiB} or {@code 40B}. diff --git a/src/java/org/apache/cassandra/service/disk/usage/DiskUsageBroadcaster.java b/src/java/org/apache/cassandra/service/disk/usage/DiskUsageBroadcaster.java index 9f638e3313..ceb8265318 100644 --- a/src/java/org/apache/cassandra/service/disk/usage/DiskUsageBroadcaster.java +++ b/src/java/org/apache/cassandra/service/disk/usage/DiskUsageBroadcaster.java @@ -18,6 +18,7 @@ package org.apache.cassandra.service.disk.usage; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; @@ -27,13 +28,16 @@ import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.gms.ApplicationState; import org.apache.cassandra.gms.EndpointState; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.gms.IEndpointStateChangeSubscriber; import org.apache.cassandra.gms.VersionedValue; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.Locator; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.tcm.membership.Location; import org.apache.cassandra.utils.NoSpamLogger; /** @@ -50,6 +54,8 @@ public class DiskUsageBroadcaster implements IEndpointStateChangeSubscriber private final DiskUsageMonitor monitor; private final ConcurrentMap<InetAddressAndPort, DiskUsageState> usageInfo = new ConcurrentHashMap<>(); private volatile boolean hasStuffedOrFullNode = false; + private final ConcurrentMap<String, Set<InetAddressAndPort>> fullNodesByDatacenter = new ConcurrentHashMap<>(); + private final ConcurrentMap<String, Set<InetAddressAndPort>> stuffedNodesByDatacenter = new ConcurrentHashMap<>(); @VisibleForTesting public DiskUsageBroadcaster(DiskUsageMonitor monitor) @@ -83,6 +89,34 @@ public class DiskUsageBroadcaster implements IEndpointStateChangeSubscriber return state(endpoint).isStuffed(); } + /** + * @return {@code true} if there exists any node in the datacenter of {@code endpoint} which has FULL disk usage. + */ + @VisibleForTesting + public boolean isDatacenterFull(String datacenter) + { + if (!hasStuffedOrFullNode()) + { + return false; + } + Set<InetAddressAndPort> fullNodes = fullNodesByDatacenter.get(datacenter); + return fullNodes != null && !fullNodes.isEmpty(); + } + + /** + * @return {@code true} if there exists any node in the datacenter of {@code endpoint} which has FULL disk usage + */ + @VisibleForTesting + public boolean isDatacenterStuffed(String datacenter) + { + if (!hasStuffedOrFullNode()) + { + return false; + } + Set<InetAddressAndPort> stuffedNodes = stuffedNodesByDatacenter.get(datacenter); + return stuffedNodes != null && !stuffedNodes.isEmpty(); + } + @VisibleForTesting public DiskUsageState state(InetAddressAndPort endpoint) { @@ -114,8 +148,9 @@ public class DiskUsageBroadcaster implements IEndpointStateChangeSubscriber noSpamLogger.warn(String.format("Found unknown DiskUsageState: %s. Using default state %s instead.", value.value, usageState)); } - usageInfo.put(endpoint, usageState); + computeUsageStateForEpDatacenter(endpoint, usageState); + usageInfo.put(endpoint, usageState); hasStuffedOrFullNode = usageState.isStuffedOrFull() || computeHasStuffedOrFullNode(); } @@ -131,6 +166,85 @@ public class DiskUsageBroadcaster implements IEndpointStateChangeSubscriber return false; } + /** + * Update the set of full nodes by datacenter based on the disk usage state for the given endpoint. + * If the node is FULL, add it to the set for its datacenter. Otherwise, remove it from the set. + * This method is idempotent - adding an already-present node or removing an absent node has no effect. + * + * @param endpoint The endpoint whose state has changed. + * @param usageState The new disk usage state value. + */ + private void computeUsageStateForEpDatacenter(InetAddressAndPort endpoint, DiskUsageState usageState) + { + Location location = location(endpoint); + if (location.equals(Location.UNKNOWN)) + { + noSpamLogger.warn("Unable to track disk usage by datacenter for endpoint {} because we are unable to determine its location.", + endpoint); + return; + } + + String datacenter = location.datacenter; + if (usageState.isFull()) + { + // Add this node to the set of full nodes for its datacenter and remove it from the stuffed nodes + // if it was there. + fullNodesByDatacenter.computeIfAbsent(datacenter, dc -> ConcurrentHashMap.newKeySet()) + .add(endpoint); + noSpamLogger.debug("Endpoint {} is FULL, added to full nodes set for datacenter {}", endpoint, datacenter); + Set<InetAddressAndPort> stuffedNodes = stuffedNodesByDatacenter.get(datacenter); + if (stuffedNodes != null && stuffedNodes.remove(endpoint)) + { + noSpamLogger.debug("Endpoint {} is now FULL. Removed it from the stuffed nodes set for datacenter {}", + endpoint, datacenter); + } + } + else if (usageState.isStuffed()) + { + // Add this node to the set of stuffed nodes for its datacenter and remove it from the full nodes + // if it was there. + stuffedNodesByDatacenter.computeIfAbsent(datacenter, dc -> ConcurrentHashMap.newKeySet()) + .add(endpoint); + noSpamLogger.debug("Endpoint {} is now STUFFED. Added it to the stuffed nodes set for datacenter {}", + endpoint, datacenter); + Set<InetAddressAndPort> fullNodes = fullNodesByDatacenter.get(datacenter); + if (fullNodes != null && fullNodes.remove(endpoint)) + { + noSpamLogger.debug("Endpoint {} is now STUFFED. Removed it from full nodes set for datacenter {}", + endpoint, datacenter); + } + } + else + { + // Remove this node from the set of full nodes and set of stuffed nodes for its datacenter if it was there. + Set<InetAddressAndPort> fullNodes = fullNodesByDatacenter.get(datacenter); + if (fullNodes != null && fullNodes.remove(endpoint)) + { + noSpamLogger.debug("Endpoint {} is no longer STUFFED or FULL, removed from stuffed for datacenter {}", + endpoint, datacenter); + } + Set<InetAddressAndPort> stuffedNodes = stuffedNodesByDatacenter.get(datacenter); + if (stuffedNodes != null && stuffedNodes.remove(endpoint)) + { + noSpamLogger.debug("Endpoint {} is no longer STUFFED, removed from the stuffed set for datacenter {}", + endpoint, datacenter); + } + } + } + + private Location location(InetAddressAndPort endpoint) + { + Locator locator = DatabaseDescriptor.getLocator(); + if (locator == null) + { + noSpamLogger.warn("Unable to track disk usage by datacenter for endpoint {} because locator is null", + endpoint); + return Location.UNKNOWN; + } + Location location = locator.location(endpoint); + return location != null ? location : Location.UNKNOWN; + } + @Override public void onJoin(InetAddressAndPort endpoint, EndpointState epState) { @@ -164,10 +278,34 @@ public class DiskUsageBroadcaster implements IEndpointStateChangeSubscriber @Override public void onRemove(InetAddressAndPort endpoint) { + updateDiskUsageStateForDatacenterOnRemoval(endpoint); usageInfo.remove(endpoint); hasStuffedOrFullNode = usageInfo.values().stream().anyMatch(DiskUsageState::isStuffedOrFull); } + private void updateDiskUsageStateForDatacenterOnRemoval(InetAddressAndPort endpoint) + { + Location nodeLocation = location(endpoint); + if (nodeLocation.equals(Location.UNKNOWN)) + { + logger.debug("Unable to determine location for removed endpoint {}. Will not update datacenter tracking.", endpoint); + return; + } + + String datacenter = nodeLocation.datacenter; + // Remove the endpoint from the full nodes and stuffed nodes set for its datacenter + Set<InetAddressAndPort> fullNodes = fullNodesByDatacenter.get(datacenter); + if (fullNodes != null && fullNodes.remove(endpoint)) + { + logger.debug("Removed endpoint {} from full nodes set for datacenter {} on node removal", endpoint, datacenter); + } + Set<InetAddressAndPort> stuffedNodes = stuffedNodesByDatacenter.get(datacenter); + if (stuffedNodes != null && stuffedNodes.remove(endpoint)) + { + logger.debug("Removed endpoint {} from stuffed nodes set for datacenter {} on node removal", endpoint, datacenter); + } + } + private void updateDiskUsage(InetAddressAndPort endpoint, EndpointState state) { VersionedValue localValue = state.getApplicationState(ApplicationState.DISK_USAGE); diff --git a/src/java/org/apache/cassandra/utils/NoSpamLogger.java b/src/java/org/apache/cassandra/utils/NoSpamLogger.java index 41ffeec50d..a2bf815b30 100644 --- a/src/java/org/apache/cassandra/utils/NoSpamLogger.java +++ b/src/java/org/apache/cassandra/utils/NoSpamLogger.java @@ -235,6 +235,16 @@ public class NoSpamLogger return new NoSpamLogger(wrapped, minInterval, timeUnit); } + public boolean debug(long nowNanos, String s, Object... objects) + { + return NoSpamLogger.this.log( Level.DEBUG, s, nowNanos, objects); + } + + public boolean debug(String s, Object... objects) + { + return NoSpamLogger.this.debug(CLOCK.nanoTime(), s, objects); + } + public boolean info(long nowNanos, String s, Object... objects) { return NoSpamLogger.this.log( Level.INFO, s, nowNanos, objects); diff --git a/test/distributed/org/apache/cassandra/distributed/test/guardrails/GuardrailDataDiskUsageKeyspaceWideProtectionTest.java b/test/distributed/org/apache/cassandra/distributed/test/guardrails/GuardrailDataDiskUsageKeyspaceWideProtectionTest.java new file mode 100644 index 0000000000..e8f9de648d --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/guardrails/GuardrailDataDiskUsageKeyspaceWideProtectionTest.java @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.guardrails; + +import java.io.IOException; + +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.InvalidQueryException; + +import org.assertj.core.api.Assertions; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import org.apache.cassandra.config.CassandraRelevantProperties; +import org.apache.cassandra.db.guardrails.Guardrails; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.api.TokenSupplier; +import org.apache.cassandra.distributed.shared.ClusterUtils; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.distributed.util.Auth; +import org.apache.cassandra.service.disk.usage.DiskUsageBroadcaster; +import org.apache.cassandra.service.disk.usage.DiskUsageState; + +import static org.apache.cassandra.config.CassandraRelevantProperties.BOOTSTRAP_SKIP_SCHEMA_CHECK; +import static org.apache.cassandra.distributed.shared.ClusterUtils.awaitRingJoin; +import static org.apache.cassandra.distributed.shared.ClusterUtils.replaceHostAndStart; + +public class GuardrailDataDiskUsageKeyspaceWideProtectionTest extends TestBaseImpl +{ + public static final int NODE_TO_MARK_AS_FULL = 2; + private static final int NUM_ROWS = 100; + private static final String NTS_KEYSPACE_NAME = "nts_keyspace1"; + private static Cluster cluster; + private static com.datastax.driver.core.Cluster driverCluster; + private static Session driverSession; + + @Before + public void setupCluster() throws IOException + { + // speed up the task that calculates and propagates the disk usage info + CassandraRelevantProperties.DISK_USAGE_MONITOR_INTERVAL_MS.setInt(100); + TokenSupplier even = TokenSupplier.evenlyDistributedTokens(2); + // build a 2-node cluster with RF=1 + cluster = init(Cluster.build(2) + .withInstanceInitializer(GuardrailDiskUsageTest.DiskStateInjection::install) + .withConfig(c -> c.with(Feature.GOSSIP, Feature.NETWORK, Feature.NATIVE_PROTOCOL) + .set("data_disk_usage_max_disk_size", "10GiB") + .set("data_disk_usage_percentage_warn_threshold", 98) + .set("data_disk_usage_percentage_fail_threshold", 99) + .set("data_disk_usage_keyspace_wide_protection_enabled", true) + .set("authenticator", "PasswordAuthenticator") + .set("initial_location_provider", "SimpleLocationProvider")) + .withTokenSupplier(node -> even.token(node == 3 ? 2 : node)) + .start(), 1); + + Auth.waitForExistingRoles(cluster.get(1)); + + // create a regular user, since the default superuser is excluded from guardrails + com.datastax.driver.core.Cluster.Builder builder = com.datastax.driver.core.Cluster.builder().addContactPoint("127.0.0.1"); + try (com.datastax.driver.core.Cluster c = builder.withCredentials("cassandra", "cassandra").build(); + Session session = c.connect()) + { + session.execute("CREATE USER test WITH PASSWORD 'test'"); + session.execute("CREATE KEYSPACE " + NTS_KEYSPACE_NAME + " WITH REPLICATION={'class': 'NetworkTopologyStrategy', 'datacenter1': 1}"); + } + + // connect using that superuser, we use the driver to get access to the client warnings + driverCluster = builder.withCredentials("test", "test").build(); + driverSession = driverCluster.connect(); + } + + @After + public void cleanup() throws IOException + { + if (driverSession != null) + driverSession.close(); + + if (driverCluster != null) + driverCluster.close(); + + if (cluster != null) + cluster.close(); + } + + @Test + public void testDiskUsageWithStopWritesForKeyspaceOnFail() throws Throwable + { + String tableName = KEYSPACE + ".guardrail_disk_usage_tbl"; + testDataDiskUsageKeyspaceWideProtectionGuardrailCommon(tableName); + } + + @Test + public void testDataDiskUsageKeyspaceWideProtectionGuardrailDatacenterFullAndNetworkTopologyStrategyUsedShouldBlockWrites() + { + String tableName = NTS_KEYSPACE_NAME + ".guardrail_disk_usage_tbl"; + testDataDiskUsageKeyspaceWideProtectionGuardrailCommon(tableName); + } + + private static void testDataDiskUsageKeyspaceWideProtectionGuardrailCommon(String tableName) + { + cluster.schemaChange(String.format("CREATE TABLE %s (k int PRIMARY KEY, v int)", tableName)); + String insert = String.format("INSERT INTO %s(k, v) VALUES (?, 0)", tableName); + ensureGuardrailCommon(insert, NODE_TO_MARK_AS_FULL); + + // Finally, if both nodes go back to SPACIOUS, all queries will succeed again + GuardrailDiskUsageTest.DiskStateInjection.setState(cluster, NODE_TO_MARK_AS_FULL, DiskUsageState.SPACIOUS); + for (int i = 0; i < NUM_ROWS; i++) + { + ResultSet rs = driverSession.execute(insert, i); + Assertions.assertThat(rs.getExecutionInfo().getWarnings()).isEmpty(); + } + } + + + + @Test + public void testDiskUsageWithStopWriteForKeyspaceWhenFullNodeReplaceWithSpaciousShouldNotBlock() throws Throwable + { + String tableName = KEYSPACE + ".guardrail_disk_usage_tbl"; + cluster.schemaChange(String.format("CREATE TABLE %s (k int PRIMARY KEY, v int)", tableName)); + String insert = String.format("INSERT INTO %s(k, v) VALUES (?, 0)", tableName); + + ensureGuardrailCommon(insert, NODE_TO_MARK_AS_FULL); + // When we replace the node with a SPACIOUS node, then we should succeed again. + IInvokableInstance nodeToRemove = cluster.get(NODE_TO_MARK_AS_FULL); + ClusterUtils.stopUnchecked(nodeToRemove); + IInvokableInstance replacingNode = replaceHostAndStart(cluster, nodeToRemove, props -> { + // since we have a downed host there might be a schema version which is old show up but + // can't be fetched since the host is down... + props.set(BOOTSTRAP_SKIP_SCHEMA_CHECK, true); + }); + awaitRingJoin(cluster.get(1), replacingNode); + awaitRingJoin(replacingNode, cluster.get(1)); + for (int i = 0; i < NUM_ROWS; i++) + { + ResultSet rs = driverSession.execute(insert, i); + Assertions.assertThat(rs.getExecutionInfo().getWarnings()).isEmpty(); + } + + } + + @Test + public void testDiskUsageWithStopWriteForKeyspaceWhenFullNodeIpChangeAndBecomesSpaciousShouldNotBlock() throws Throwable + { + String tableName = KEYSPACE + ".guardrail_disk_usage_tbl"; + cluster.schemaChange(String.format("CREATE TABLE %s (k int PRIMARY KEY, v int)", tableName)); + String insert = String.format("INSERT INTO %s(k, v) VALUES (?, 0)", tableName); + ensureGuardrailCommon(insert, NODE_TO_MARK_AS_FULL); + + // If the node goes offline then comes online with a different IP, then we should still fail. + IInvokableInstance nodeToChangeIp = cluster.get(NODE_TO_MARK_AS_FULL); + ClusterUtils.stopUnchecked(nodeToChangeIp); + ClusterUtils.updateAddress(nodeToChangeIp, "127.0.0.4"); + nodeToChangeIp.startup(); + GuardrailDiskUsageTest.DiskStateInjection.setState(cluster, NODE_TO_MARK_AS_FULL, DiskUsageState.FULL); + int numFailures = 0; + for (int i = 0; i < NUM_ROWS; i++) + { + try + { + driverSession.execute(insert, i); + Assertions.fail("Should have failed"); + } + catch (InvalidQueryException e) + { + numFailures++; + } + } + Assertions.assertThat(numFailures).isEqualTo(NUM_ROWS); + + // If the node then becomes SPACIOUS then we should succeed again. + GuardrailDiskUsageTest.DiskStateInjection.setState(cluster, NODE_TO_MARK_AS_FULL, DiskUsageState.SPACIOUS); + for (int i = 0; i < NUM_ROWS; i++) + { + ResultSet rs = driverSession.execute(insert, i); + Assertions.assertThat(rs.getExecutionInfo().getWarnings()).isEmpty(); + } + } + + + @Test + public void testDiskUsageWithStopWritesForKeyspaceOnFailWhenFullNodeLeavesShouldStopBlocking() + { + String tableName = KEYSPACE + ".guardrail_disk_usage_tbl"; + cluster.schemaChange(String.format("CREATE TABLE %s (k int PRIMARY KEY, v int)", tableName)); + String insert = String.format("INSERT INTO %s(k, v) VALUES (?, 0)", tableName); + ensureGuardrailCommon(insert, NODE_TO_MARK_AS_FULL); + + // If the FULL node leaves the cluster, then writes should succeed again. + IInvokableInstance nodeToRemove = cluster.get(NODE_TO_MARK_AS_FULL); + ClusterUtils.decommission(nodeToRemove); + for (int i = 0; i < NUM_ROWS; i++) + { + ResultSet rs = driverSession.execute(insert, i); + Assertions.assertThat(rs.getExecutionInfo().getWarnings()).isEmpty(); + } + } + + @Test + public void testDiskUsageWithStopWritesForKeyspaceWhenFlagDisabledShouldNotBlockEntireKeyspace() + { + String tableName = KEYSPACE + ".guardrail_disk_usage_tbl"; + cluster.schemaChange(String.format("CREATE TABLE %s (k int PRIMARY KEY, v int)", tableName)); + String insert = String.format("INSERT INTO %s(k, v) VALUES (?, 0)", tableName); + ensureGuardrailCommon(insert, NODE_TO_MARK_AS_FULL); + + // Disable the stopWritesForKeyspacesOnFail flag. + // Writes which were destined for the FULL node should fail, but others should succeed. + cluster.get(1).runOnInstance(() -> Guardrails.instance.setDataDiskUsageKeyspaceWideProtectionEnabled(false)); + int numFailures = 0; + for (int i = 0; i < NUM_ROWS; i++) + { + try + { + driverSession.execute(insert, i); + } + catch (InvalidQueryException e) + { + numFailures++; + } + } + Assertions.assertThat(numFailures).isBetween(1, NUM_ROWS - 1); + } + + @Test + public void testDiskUsageWithStopWritesForKeyspaceWhenFlagToggledAndNodeLeavesShouldReEnableWrites() + { + String tableName = KEYSPACE + ".guardrail_disk_usage_tbl"; + cluster.schemaChange(String.format("CREATE TABLE %s (k int PRIMARY KEY, v int)", tableName)); + String insert = String.format("INSERT INTO %s(k, v) VALUES (?, 0)", tableName); + ensureGuardrailCommon(insert, NODE_TO_MARK_AS_FULL); + cluster.get(1).runOnInstance(() -> Guardrails.instance.setDataDiskUsageKeyspaceWideProtectionEnabled(false)); + // If the FULL node leaves the cluster, then writes should succeed again. + IInvokableInstance nodeToRemove = cluster.get(NODE_TO_MARK_AS_FULL); + ClusterUtils.decommission(nodeToRemove); + for (int i = 0; i < NUM_ROWS; i++) + { + ResultSet rs = driverSession.execute(insert, i); + Assertions.assertThat(rs.getExecutionInfo().getWarnings()).isEmpty(); + } + cluster.get(1).runOnInstance(() -> Guardrails.instance.setDataDiskUsageKeyspaceWideProtectionEnabled(true)); + for (int i = 0; i < NUM_ROWS; i++) + { + ResultSet rs = driverSession.execute(insert, i); + Assertions.assertThat(rs.getExecutionInfo().getWarnings()).isEmpty(); + } + cluster.get(1).runOnInstance(() -> Assertions.assertThat(DiskUsageBroadcaster.instance.isDatacenterFull("datacenter1")).isFalse()); + cluster.get(1).runOnInstance(() -> Assertions.assertThat(DiskUsageBroadcaster.instance.isDatacenterStuffed("datacenter1")).isFalse()); + } + + @Test + public void testDiskUsageWithStopWritesForKeyspaceWhenFlagToggledAndNodeBecomesSpaciousShouldReEnableWrites() + { + String tableName = KEYSPACE + ".guardrail_disk_usage_tbl"; + cluster.schemaChange(String.format("CREATE TABLE %s (k int PRIMARY KEY, v int)", tableName)); + String insert = String.format("INSERT INTO %s(k, v) VALUES (?, 0)", tableName); + ensureGuardrailCommon(insert, NODE_TO_MARK_AS_FULL); + cluster.get(1).runOnInstance(() -> Guardrails.instance.setDataDiskUsageKeyspaceWideProtectionEnabled(false)); + GuardrailDiskUsageTest.DiskStateInjection.setState(cluster, NODE_TO_MARK_AS_FULL, DiskUsageState.SPACIOUS); + cluster.get(1).runOnInstance(() -> Guardrails.instance.setDataDiskUsageKeyspaceWideProtectionEnabled(true)); + for (int i = 0; i < NUM_ROWS; i++) + { + ResultSet rs = driverSession.execute(insert, i); + Assertions.assertThat(rs.getExecutionInfo().getWarnings()).isEmpty(); + } + cluster.get(1).runOnInstance(() -> Assertions.assertThat(DiskUsageBroadcaster.instance.isDatacenterFull("datacenter1")).isFalse()); + cluster.get(1).runOnInstance(() -> Assertions.assertThat(DiskUsageBroadcaster.instance.isDatacenterStuffed("datacenter1")).isFalse()); + } + + /** + * Ensures that the guardrail works in the common scenario across all tests (i.e., with both nodes SPACIOUS we + * succeed and with one FULL we fail). + * + * @param insert The insert statement which we will use to test the guardrail. + * @param node The node which we will mark as full. + */ + private static void ensureGuardrailCommon(String insert, int node) + { + // With both nodes in SPACIOUS state, we can write without warnings nor failures + for (int i = 0; i < NUM_ROWS; i++) + { + ResultSet rs = driverSession.execute(insert, i); + Assertions.assertThat(rs.getExecutionInfo().getWarnings()).isEmpty(); + } + + // If the STUFFED node becomes full, but the data_disk_usage_keyspace_wide_protection_enabled is set, + // then all writes will fail regardless of node. + GuardrailDiskUsageTest.DiskStateInjection.setState(cluster, node, DiskUsageState.FULL); + int numFailures = 0; + for (int i = 0; i < NUM_ROWS; i++) + { + try + { + driverSession.execute(insert, i); + Assertions.fail("Should have failed"); + } + catch (InvalidQueryException e) + { + numFailures++; + } + } + Assertions.assertThat(numFailures).isEqualTo(NUM_ROWS); + } +} diff --git a/test/unit/org/apache/cassandra/tools/nodetool/GuardrailsConfigCommandsTest.java b/test/unit/org/apache/cassandra/tools/nodetool/GuardrailsConfigCommandsTest.java index d88949b473..37958178b7 100644 --- a/test/unit/org/apache/cassandra/tools/nodetool/GuardrailsConfigCommandsTest.java +++ b/test/unit/org/apache/cassandra/tools/nodetool/GuardrailsConfigCommandsTest.java @@ -198,24 +198,25 @@ public class GuardrailsConfigCommandsTest extends CQLTester private static final String ALL_FLAGS_GETTER_OUTPUT = - "allow_filtering_enabled true\n" + - "alter_table_enabled true\n" + - "bulk_load_enabled true\n" + - "compact_tables_enabled true\n" + - "drop_keyspace_enabled true\n" + - "drop_truncate_table_enabled true\n" + - "group_by_enabled true\n" + - "intersect_filtering_query_enabled true\n" + - "intersect_filtering_query_warned true\n" + - "non_partition_restricted_index_query_enabled true\n" + - "read_before_write_list_operations_enabled true\n" + - "secondary_indexes_enabled true\n" + - "simplestrategy_enabled true\n" + - "uncompressed_tables_enabled true\n" + - "user_timestamps_enabled true\n" + - "vector_type_enabled true\n" + - "zero_ttl_on_twcs_enabled true\n" + - "zero_ttl_on_twcs_warned true\n"; + "allow_filtering_enabled true\n" + + "alter_table_enabled true\n" + + "bulk_load_enabled true\n" + + "compact_tables_enabled true\n" + + "data_disk_usage_keyspace_wide_protection_enabled false\n" + + "drop_keyspace_enabled true\n" + + "drop_truncate_table_enabled true\n" + + "group_by_enabled true\n" + + "intersect_filtering_query_enabled true\n" + + "intersect_filtering_query_warned true\n" + + "non_partition_restricted_index_query_enabled true\n" + + "read_before_write_list_operations_enabled true\n" + + "secondary_indexes_enabled true\n" + + "simplestrategy_enabled true\n" + + "uncompressed_tables_enabled true\n" + + "user_timestamps_enabled true\n" + + "vector_type_enabled true\n" + + "zero_ttl_on_twcs_enabled true\n" + + "zero_ttl_on_twcs_warned true\n"; private static final String ALL_THRESHOLDS_GETTER_OUTPUT = "collection_list_size_threshold [null, null] \n" + --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
