This is an automated email from the ASF dual-hosted git repository.
adelapena pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new b3842de5cf Add guardrail for data disk usage
b3842de5cf is described below
commit b3842de5cf1fa1b81872effb4585fbc7e1873d59
Author: Andrés de la Peña <[email protected]>
AuthorDate: Fri Apr 22 16:36:07 2022 +0100
Add guardrail for data disk usage
patch by Andrés de la Peña; reviewed by Ekaterina Dimitrova and Stefan
Miklosovic for CASSANDRA-17150
Co-authored-by: Andrés de la Peña <[email protected]>
Co-authored-by: Zhao Yang <[email protected]>
Co-authored-by: Eduard Tudenhoefner <[email protected]>
---
CHANGES.txt | 1 +
NEWS.txt | 28 +
conf/cassandra.yaml | 24 +-
.../config/CassandraRelevantProperties.java | 8 +
src/java/org/apache/cassandra/config/Config.java | 49 +-
.../apache/cassandra/config/DataStorageSpec.java | 13 +-
.../apache/cassandra/config/GuardrailsOptions.java | 121 +++-
.../org/apache/cassandra/cql3/QueryOptions.java | 9 +-
.../cassandra/cql3/selection/ResultSetBuilder.java | 5 +-
.../cassandra/cql3/statements/BatchStatement.java | 8 +-
.../cql3/statements/ModificationStatement.java | 25 +
src/java/org/apache/cassandra/db/Directories.java | 5 +
src/java/org/apache/cassandra/db/ReadCommand.java | 8 +-
.../apache/cassandra/db/guardrails/Guardrail.java | 92 ++-
.../apache/cassandra/db/guardrails/Guardrails.java | 112 +++-
.../cassandra/db/guardrails/GuardrailsConfig.java | 25 +-
.../cassandra/db/guardrails/GuardrailsMBean.java | 61 +-
.../db/guardrails/PercentageThreshold.java | 56 ++
.../apache/cassandra/db/guardrails/Predicates.java | 93 ++++
.../apache/cassandra/db/guardrails/Threshold.java | 20 +-
.../org/apache/cassandra/gms/ApplicationState.java | 1 +
.../org/apache/cassandra/gms/VersionedValue.java | 5 +
.../cassandra/io/sstable/format/SSTableWriter.java | 2 +-
.../apache/cassandra/service/StorageService.java | 2 +
.../service/disk/usage/DiskUsageBroadcaster.java | 181 ++++++
.../service/disk/usage/DiskUsageMonitor.java | 233 ++++++++
.../service/disk/usage/DiskUsageState.java | 70 +++
.../test/guardrails/GuardrailDiskUsageTest.java | 225 ++++++++
.../cassandra/config/DataStorageSpecTest.java | 29 +-
.../db/guardrails/GuardrailCollectionSizeTest.java | 10 +-
.../db/guardrails/GuardrailDiskUsageTest.java | 617 +++++++++++++++++++++
.../cassandra/db/guardrails/GuardrailTester.java | 10 +
.../cassandra/db/guardrails/GuardrailsTest.java | 46 ++
.../cassandra/db/guardrails/ThresholdTester.java | 28 +-
.../cassandra/db/virtual/GossipInfoTableTest.java | 3 +-
35 files changed, 2125 insertions(+), 100 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index a1213090e2..9e9e1ee2f1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.1
+ * Add guardrail for data disk usage (CASSANDRA-17150)
* Tool to list data paths of existing tables (CASSANDRA-17568)
* Migrate track_warnings to more standard naming conventions and use latest
configuration types rather than long (CASSANDRA-17560)
* Add support for CONTAINS and CONTAINS KEY in conditional UPDATE and DELETE
statement (CASSANDRA-10537)
diff --git a/NEWS.txt b/NEWS.txt
index a891eb3a9a..fd31e06c93 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -56,6 +56,34 @@ using the provided 'sstableupgrade' tool.
New features
------------
+ - Added a new guardrails framework allowing to define soft/hard limits for
different user actions, such as limiting
+ the number of tables, columns per table or the size of collections.
These guardrails are only applied to regular
+ user queries, and superusers and internal queries are excluded. Reaching
the soft limit raises a client warning,
+ whereas reaching the hard limit aborts the query. In both cases a log
message and a diagnostic event are emitted.
+ Additionally, some guardrails are not linked to specific user queries
due to techincal limitations, such as
+ detecting the size of large collections during compaction or
periodically monitoring the disk usage. These
+ guardrails would only emit the proper logs and diagnostic events when
triggered, without aborting any processes.
+ Guardrails config is defined through cassandra.yaml properties, and they
can be dynamically updated through the
+ JMX MBean `org.apache.cassandra.db:type=Guardrails`. There are
guardrails for:
+ - Number of user keyspaces.
+ - Number of user tables.
+ - Number of columns per table.
+ - Number of secondary indexes per table.
+ - Number of materialized tables per table.
+ - Number of fields per user-defined type.
+ - Number of items in a collection .
+ - Number of partition keys selected by an IN restriction.
+ - Number of partition keys selected by the cartesian product of
multiple IN restrictions.
+ - Allowed table properties.
+ - Allowed read consistency levels.
+ - Allowed write consistency levels.
+ - Collections size.
+ - Query page size.
+ - Data disk usage, defined either as a percentage or as an absolute
size.
+ - Whether user-defined timestamps are allowed.
+ - Whether GROUP BY queries are allowed.
+ - Whether the creation of secondary indexes is allowed.
+ - Whether the creation of uncompressed tables is allowed.
- Add support for the use of pure monotonic functions on the last
attribute of the GROUP BY clause.
- Add floor functions that can be use to group by time range.
- Support for native transport rate limiting via
native_transport_rate_limiting_enabled and
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index c455d45950..b28f4388f5 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -1653,9 +1653,9 @@ drop_compact_storage_enabled: false
# of non-frozen collections there could be unaccounted parts of the collection
on the sstables. This is done this way to
# prevent read-before-write. The guardrail is also checked at sstable write
time to detect large non-frozen collections,
# although in that case exceeding the fail threshold will only log an error
message, without interrupting the operation.
-# The two thresholds default to 0KiB to disable.
-# collection_size_warn_threshold: 0KiB
-# collection_size_fail_threshold: 0KiB
+# The two thresholds default to null to disable.
+# collection_size_warn_threshold:
+# collection_size_fail_threshold:
# Guardrail to warn or fail when encountering more elements in collection than
threshold.
# At query time this guardrail is applied only to the collection fragment that
is being writen, even though in the case
# of non-frozen collections there could be unaccounted parts of the collection
on the sstables. This is done this way to
@@ -1668,6 +1668,24 @@ drop_compact_storage_enabled: false
# Default -1 to disable.
# fields_per_udt_warn_threshold: -1
# fields_per_udt_fail_threshold: -1
+# Guardrail to warn or fail when local data disk usage percentage exceeds
threshold. Valid values are in [1, 100].
+# This is only used for the disks storing data directories, so it won't count
any separate disks used for storing
+# the commitlog, hints nor saved caches. The disk usage is the ratio between
the amount of space used by the data
+# directories and the addition of that same space and the remaining free space
on disk. The main purpose of this
+# guardrail is rejecting user writes when the disks are over the defined usage
percentage, so the writes done by
+# background processes such as compaction and streaming don't fail due to a
full disk. The limits should be defined
+# accordingly to the expected data growth due to those background processes,
so for example a compaction strategy
+# doubling the size of the data would require to keep the disk usage under 50%.
+# The two thresholds default to -1 to disable.
+# data_disk_usage_percentage_warn_threshold: -1
+# data_disk_usage_percentage_fail_threshold: -1
+# Allows defining the max disk size of the data directories when calculating
thresholds for
+# disk_usage_percentage_warn_threshold and
disk_usage_percentage_fail_threshold, so if this is greater than zero they
+# become percentages of a fixed size on disk instead of percentages of the
physically available disk size. This should
+# be useful when we have a large disk and we only want to use a part of it for
Cassandra's data directories.
+# Valid values are in [1, max available disk size of all data directories].
+# Defaults to null to disable and use the physically available disk size of
data directories during calculations.
+# data_disk_usage_max_disk_size:
# Startup Checks are executed as part of Cassandra startup process, not all of
them
# are configurable (so you can disable them) but these which are enumerated
bellow.
diff --git
a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
index 3b63cf8d03..72da3307dc 100644
--- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
+++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
@@ -18,6 +18,8 @@
package org.apache.cassandra.config;
+import java.util.concurrent.TimeUnit;
+
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.service.FileSystemOwnershipCheck;
@@ -265,6 +267,12 @@ public enum CassandraRelevantProperties
TEST_IGNORE_SIGAR("cassandra.test.ignore_sigar", "false"),
PAXOS_EXECUTE_ON_SELF("cassandra.paxos.use_self_execution", "true"),
+ /** property for the rate of the scheduled task that monitors disk usage */
+ DISK_USAGE_MONITOR_INTERVAL_MS("cassandra.disk_usage.monitor_interval_ms",
Long.toString(TimeUnit.SECONDS.toMillis(30))),
+
+ /** property for the interval on which the repeated client warnings and
diagnostic events about disk usage are ignored */
+ DISK_USAGE_NOTIFY_INTERVAL_MS("cassandra.disk_usage.notify_interval_ms",
Long.toString(TimeUnit.MINUTES.toMillis(30))),
+
// for specific tests
ORG_APACHE_CASSANDRA_CONF_CASSANDRA_RELEVANT_PROPERTIES_TEST("org.apache.cassandra.conf.CassandraRelevantPropertiesTest"),
ORG_APACHE_CASSANDRA_DB_VIRTUAL_SYSTEM_PROPERTIES_TABLE_TEST("org.apache.cassandra.db.virtual.SystemPropertiesTableTest"),
diff --git a/src/java/org/apache/cassandra/config/Config.java
b/src/java/org/apache/cassandra/config/Config.java
index 58cfbfddfd..7f6c926d08 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -783,24 +783,22 @@ public class Config
public volatile SubnetGroups client_error_reporting_exclusions = new
SubnetGroups();
public volatile SubnetGroups internode_error_reporting_exclusions = new
SubnetGroups();
- public static final int DISABLED_GUARDRAIL = -1;
- public static final DataStorageSpec DISABLED_SIZE_GUARDRAIL =
DataStorageSpec.inBytes(0);
- public volatile int keyspaces_warn_threshold = DISABLED_GUARDRAIL;
- public volatile int keyspaces_fail_threshold = DISABLED_GUARDRAIL;
- public volatile int tables_warn_threshold = DISABLED_GUARDRAIL;
- public volatile int tables_fail_threshold = DISABLED_GUARDRAIL;
- public volatile int columns_per_table_warn_threshold = DISABLED_GUARDRAIL;
- public volatile int columns_per_table_fail_threshold = DISABLED_GUARDRAIL;
- public volatile int secondary_indexes_per_table_warn_threshold =
DISABLED_GUARDRAIL;
- public volatile int secondary_indexes_per_table_fail_threshold =
DISABLED_GUARDRAIL;
- public volatile int materialized_views_per_table_warn_threshold =
DISABLED_GUARDRAIL;
- public volatile int materialized_views_per_table_fail_threshold =
DISABLED_GUARDRAIL;
- public volatile int page_size_warn_threshold = DISABLED_GUARDRAIL;
- public volatile int page_size_fail_threshold = DISABLED_GUARDRAIL;
- public volatile int partition_keys_in_select_warn_threshold =
DISABLED_GUARDRAIL;
- public volatile int partition_keys_in_select_fail_threshold =
DISABLED_GUARDRAIL;
- public volatile int in_select_cartesian_product_warn_threshold =
DISABLED_GUARDRAIL;
- public volatile int in_select_cartesian_product_fail_threshold =
DISABLED_GUARDRAIL;
+ public volatile int keyspaces_warn_threshold = -1;
+ public volatile int keyspaces_fail_threshold = -1;
+ public volatile int tables_warn_threshold = -1;
+ public volatile int tables_fail_threshold = -1;
+ public volatile int columns_per_table_warn_threshold = -1;
+ public volatile int columns_per_table_fail_threshold = -1;
+ public volatile int secondary_indexes_per_table_warn_threshold = -1;
+ public volatile int secondary_indexes_per_table_fail_threshold = -1;
+ public volatile int materialized_views_per_table_warn_threshold = -1;
+ public volatile int materialized_views_per_table_fail_threshold = -1;
+ public volatile int page_size_warn_threshold = -1;
+ public volatile int page_size_fail_threshold = -1;
+ public volatile int partition_keys_in_select_warn_threshold = -1;
+ public volatile int partition_keys_in_select_fail_threshold = -1;
+ public volatile int in_select_cartesian_product_warn_threshold = -1;
+ public volatile int in_select_cartesian_product_fail_threshold = -1;
public volatile Set<String> table_properties_warned =
Collections.emptySet();
public volatile Set<String> table_properties_ignored =
Collections.emptySet();
public volatile Set<String> table_properties_disallowed =
Collections.emptySet();
@@ -814,12 +812,15 @@ public class Config
public volatile boolean uncompressed_tables_enabled = true;
public volatile boolean compact_tables_enabled = true;
public volatile boolean read_before_write_list_operations_enabled = true;
- public volatile DataStorageSpec collection_size_warn_threshold =
DISABLED_SIZE_GUARDRAIL;
- public volatile DataStorageSpec collection_size_fail_threshold =
DISABLED_SIZE_GUARDRAIL;
- public volatile int items_per_collection_warn_threshold =
DISABLED_GUARDRAIL;
- public volatile int items_per_collection_fail_threshold =
DISABLED_GUARDRAIL;
- public volatile int fields_per_udt_warn_threshold = DISABLED_GUARDRAIL;
- public volatile int fields_per_udt_fail_threshold = DISABLED_GUARDRAIL;
+ public volatile DataStorageSpec collection_size_warn_threshold = null;
+ public volatile DataStorageSpec collection_size_fail_threshold = null;
+ public volatile int items_per_collection_warn_threshold = -1;
+ public volatile int items_per_collection_fail_threshold = -1;
+ public volatile int fields_per_udt_warn_threshold = -1;
+ public volatile int fields_per_udt_fail_threshold = -1;
+ public volatile int data_disk_usage_percentage_warn_threshold = -1;
+ public volatile int data_disk_usage_percentage_fail_threshold = -1;
+ public volatile DataStorageSpec data_disk_usage_max_disk_size = null;
public volatile DurationSpec streaming_state_expires =
DurationSpec.inDays(3);
public volatile DataStorageSpec streaming_state_size =
DataStorageSpec.inMebibytes(40);
diff --git a/src/java/org/apache/cassandra/config/DataStorageSpec.java
b/src/java/org/apache/cassandra/config/DataStorageSpec.java
index eeafe2ed8d..93224b3245 100644
--- a/src/java/org/apache/cassandra/config/DataStorageSpec.java
+++ b/src/java/org/apache/cassandra/config/DataStorageSpec.java
@@ -134,6 +134,17 @@ public class DataStorageSpec
return new DataStorageSpec(mebibytes, MEBIBYTES);
}
+ /**
+ * Creates a {@code DataStorageSpec} of the specified amount of gibibytes.
+ *
+ * @param gibibytes the amount of gibibytes
+ * @return a {@code DataStorageSpec}
+ */
+ public static DataStorageSpec inGibibytes(long gibibytes)
+ {
+ return new DataStorageSpec(gibibytes, GIBIBYTES);
+ }
+
/**
* @return the data storage unit.
*/
@@ -421,4 +432,4 @@ public class DataStorageSpec
throw new AbstractMethodError();
}
}
-}
\ No newline at end of file
+}
diff --git a/src/java/org/apache/cassandra/config/GuardrailsOptions.java
b/src/java/org/apache/cassandra/config/GuardrailsOptions.java
index b00075ab21..b73cc195bf 100644
--- a/src/java/org/apache/cassandra/config/GuardrailsOptions.java
+++ b/src/java/org/apache/cassandra/config/GuardrailsOptions.java
@@ -18,12 +18,16 @@
package org.apache.cassandra.config;
+import java.math.BigInteger;
import java.util.Collections;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Supplier;
+import javax.annotation.Nullable;
+
import com.google.common.collect.Sets;
+import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,6 +35,7 @@ import
org.apache.cassandra.cql3.statements.schema.TableAttributes;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.guardrails.Guardrails;
import org.apache.cassandra.db.guardrails.GuardrailsConfig;
+import org.apache.cassandra.service.disk.usage.DiskUsageMonitor;
import static java.lang.String.format;
import static java.util.stream.Collectors.toSet;
@@ -74,9 +79,11 @@ public class GuardrailsOptions implements GuardrailsConfig
config.read_consistency_levels_disallowed =
validateConsistencyLevels(config.read_consistency_levels_disallowed,
"read_consistency_levels_disallowed");
config.write_consistency_levels_warned =
validateConsistencyLevels(config.write_consistency_levels_warned,
"write_consistency_levels_warned");
config.write_consistency_levels_disallowed =
validateConsistencyLevels(config.write_consistency_levels_disallowed,
"write_consistency_levels_disallowed");
- validateSizeThreshold(config.collection_size_warn_threshold,
config.collection_size_fail_threshold, "collection_size");
+ validateSizeThreshold(config.collection_size_warn_threshold,
config.collection_size_fail_threshold, false, "collection_size");
validateIntThreshold(config.items_per_collection_warn_threshold,
config.items_per_collection_fail_threshold, "items_per_collection");
validateIntThreshold(config.fields_per_udt_warn_threshold,
config.fields_per_udt_fail_threshold, "fields_per_udt");
+
validatePercentageThreshold(config.data_disk_usage_percentage_warn_threshold,
config.data_disk_usage_percentage_fail_threshold, "data_disk_usage_percentage");
+ validateDataDiskUsageMaxDiskSize(config.data_disk_usage_max_disk_size);
}
@Override
@@ -460,20 +467,23 @@ public class GuardrailsOptions implements GuardrailsConfig
x ->
config.write_consistency_levels_disallowed = x);
}
+ @Override
+ @Nullable
public DataStorageSpec getCollectionSizeWarnThreshold()
{
return config.collection_size_warn_threshold;
}
@Override
+ @Nullable
public DataStorageSpec getCollectionSizeFailThreshold()
{
return config.collection_size_fail_threshold;
}
- public void setCollectionSizeThreshold(DataStorageSpec warn,
DataStorageSpec fail)
+ public void setCollectionSizeThreshold(@Nullable DataStorageSpec warn,
@Nullable DataStorageSpec fail)
{
- validateSizeThreshold(warn, fail, "collection_size");
+ validateSizeThreshold(warn, fail, false, "collection_size");
updatePropertyWithLogging("collection_size_warn_threshold",
warn,
() -> config.collection_size_warn_threshold,
@@ -534,10 +544,49 @@ public class GuardrailsOptions implements GuardrailsConfig
x -> config.fields_per_udt_fail_threshold =
x);
}
+ public int getDataDiskUsagePercentageWarnThreshold()
+ {
+ return config.data_disk_usage_percentage_warn_threshold;
+ }
+
+ @Override
+ public int getDataDiskUsagePercentageFailThreshold()
+ {
+ return config.data_disk_usage_percentage_fail_threshold;
+ }
+
+ public void setDataDiskUsagePercentageThreshold(int warn, int fail)
+ {
+ validatePercentageThreshold(warn, fail, "data_disk_usage_percentage");
+ updatePropertyWithLogging("data_disk_usage_percentage_warn_threshold",
+ warn,
+ () ->
config.data_disk_usage_percentage_warn_threshold,
+ x ->
config.data_disk_usage_percentage_warn_threshold = x);
+ updatePropertyWithLogging("data_disk_usage_percentage_fail_threshold",
+ fail,
+ () ->
config.data_disk_usage_percentage_fail_threshold,
+ x ->
config.data_disk_usage_percentage_fail_threshold = x);
+ }
+
+ @Override
+ public DataStorageSpec getDataDiskUsageMaxDiskSize()
+ {
+ return config.data_disk_usage_max_disk_size;
+ }
+
+ public void setDataDiskUsageMaxDiskSize(@Nullable DataStorageSpec diskSize)
+ {
+ validateDataDiskUsageMaxDiskSize(diskSize);
+ updatePropertyWithLogging("data_disk_usage_max_disk_size",
+ diskSize,
+ () -> config.data_disk_usage_max_disk_size,
+ x -> config.data_disk_usage_max_disk_size =
x);
+ }
+
private static <T> void updatePropertyWithLogging(String propertyName, T
newValue, Supplier<T> getter, Consumer<T> setter)
{
T oldValue = getter.get();
- if (!newValue.equals(oldValue))
+ if (newValue == null || !newValue.equals(oldValue))
{
setter.accept(newValue);
logger.info("Updated {} from {} to {}", propertyName, oldValue,
newValue);
@@ -546,7 +595,7 @@ public class GuardrailsOptions implements GuardrailsConfig
private static void validatePositiveNumeric(long value, long maxValue,
String name)
{
- if (value == Config.DISABLED_GUARDRAIL)
+ if (value == -1)
return;
if (value > maxValue)
@@ -555,14 +604,17 @@ public class GuardrailsOptions implements GuardrailsConfig
if (value == 0)
throw new IllegalArgumentException(format("Invalid value for %s: 0
is not allowed; " +
- "if attempting to
disable use %d",
- name,
Config.DISABLED_GUARDRAIL));
+ "if attempting to
disable use -1", name));
// We allow -1 as a general "disabling" flag. But reject anything
lower to avoid mistakes.
if (value <= 0)
throw new IllegalArgumentException(format("Invalid value %d for
%s: negative values are not allowed, " +
- "outside of %d which
disables the guardrail",
- value, name,
Config.DISABLED_GUARDRAIL));
+ "outside of -1 which
disables the guardrail", value, name));
+ }
+
+ private static void validatePercentage(long value, String name)
+ {
+ validatePositiveNumeric(value, 100, name);
}
private static void validateIntThreshold(int warn, int fail, String name)
@@ -572,9 +624,16 @@ public class GuardrailsOptions implements GuardrailsConfig
validateWarnLowerThanFail(warn, fail, name);
}
+ private static void validatePercentageThreshold(int warn, int fail, String
name)
+ {
+ validatePercentage(warn, name + "_warn_threshold");
+ validatePercentage(fail, name + "_fail_threshold");
+ validateWarnLowerThanFail(warn, fail, name);
+ }
+
private static void validateWarnLowerThanFail(long warn, long fail, String
name)
{
- if (warn == Config.DISABLED_GUARDRAIL || fail ==
Config.DISABLED_GUARDRAIL)
+ if (warn == -1 || fail == -1)
return;
if (fail < warn)
@@ -582,9 +641,27 @@ public class GuardrailsOptions implements GuardrailsConfig
"than the fail threshold
%d", warn, name, fail));
}
- private static void validateSizeThreshold(DataStorageSpec warn,
DataStorageSpec fail, String name)
+ private static void validateSize(DataStorageSpec size, boolean allowZero,
String name)
+ {
+ if (size == null)
+ return;
+
+ if (!allowZero && size.toBytes() == 0)
+ throw new IllegalArgumentException(format("Invalid value for %s: 0
is not allowed; " +
+ "if attempting to
disable use an empty value",
+ name));
+ }
+
+ private static void validateSizeThreshold(DataStorageSpec warn,
DataStorageSpec fail, boolean allowZero, String name)
{
- if (warn.equals(Config.DISABLED_SIZE_GUARDRAIL) ||
fail.equals(Config.DISABLED_SIZE_GUARDRAIL))
+ validateSize(warn, allowZero, name + "_warn_threshold");
+ validateSize(fail, allowZero, name + "_fail_threshold");
+ validateWarnLowerThanFail(warn, fail, name);
+ }
+
+ private static void validateWarnLowerThanFail(DataStorageSpec warn,
DataStorageSpec fail, String name)
+ {
+ if (warn == null || fail == null)
return;
if (fail.toBytes() < warn.toBytes())
@@ -615,4 +692,24 @@ public class GuardrailsOptions implements GuardrailsConfig
return consistencyLevels.isEmpty() ? Collections.emptySet() :
Sets.immutableEnumSet(consistencyLevels);
}
+
+ private static void validateDataDiskUsageMaxDiskSize(DataStorageSpec
maxDiskSize)
+ {
+ if (maxDiskSize == null)
+ return;
+
+ validateSize(maxDiskSize, false, "data_disk_usage_max_disk_size");
+
+ BigInteger diskSize =
DiskUsageMonitor.dataDirectoriesGroupedByFileStore()
+ .keys()
+ .stream()
+
.map(DiskUsageMonitor::totalSpace)
+ .map(BigInteger::valueOf)
+ .reduce(BigInteger.ZERO,
BigInteger::add);
+
+ if (diskSize.compareTo(BigInteger.valueOf(maxDiskSize.toBytes())) < 0)
+ throw new IllegalArgumentException(format("Invalid value for
data_disk_usage_max_disk_size: " +
+ "%s specified, but only
%s are actually available on disk",
+ maxDiskSize,
DataStorageSpec.inBytes(diskSize.longValue())));
+ }
}
diff --git a/src/java/org/apache/cassandra/cql3/QueryOptions.java
b/src/java/org/apache/cassandra/cql3/QueryOptions.java
index 4193c4e15a..5fcaf06786 100644
--- a/src/java/org/apache/cassandra/cql3/QueryOptions.java
+++ b/src/java/org/apache/cassandra/cql3/QueryOptions.java
@@ -24,7 +24,6 @@ import com.google.common.collect.ImmutableList;
import io.netty.buffer.ByteBuf;
-import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DataStorageSpec;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.schema.ColumnMetadata;
@@ -282,13 +281,13 @@ public abstract class QueryOptions
@Override
public long getCoordinatorReadSizeWarnThresholdBytes()
{
- return Config.DISABLED_GUARDRAIL;
+ return -1;
}
@Override
public long getCoordinatorReadSizeFailThresholdBytes()
{
- return Config.DISABLED_GUARDRAIL;
+ return -1;
}
}
@@ -299,8 +298,8 @@ public abstract class QueryOptions
public DefaultReadThresholds(DataStorageSpec warnThreshold,
DataStorageSpec abortThreshold)
{
- this.warnThresholdBytes = warnThreshold == null ?
Config.DISABLED_GUARDRAIL : warnThreshold.toBytes();
- this.abortThresholdBytes = abortThreshold == null ?
Config.DISABLED_GUARDRAIL : abortThreshold.toBytes();
+ this.warnThresholdBytes = warnThreshold == null ? -1 :
warnThreshold.toBytes();
+ this.abortThresholdBytes = abortThreshold == null ? -1 :
abortThreshold.toBytes();
}
@Override
diff --git a/src/java/org/apache/cassandra/cql3/selection/ResultSetBuilder.java
b/src/java/org/apache/cassandra/cql3/selection/ResultSetBuilder.java
index b42357835a..22566b26d7 100644
--- a/src/java/org/apache/cassandra/cql3/selection/ResultSetBuilder.java
+++ b/src/java/org/apache/cassandra/cql3/selection/ResultSetBuilder.java
@@ -21,7 +21,6 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
-import org.apache.cassandra.config.Config;
import org.apache.cassandra.cql3.ResultSet;
import org.apache.cassandra.cql3.ResultSet.ResultMetadata;
import org.apache.cassandra.cql3.selection.Selection.Selectors;
@@ -76,7 +75,7 @@ public final class ResultSetBuilder
public boolean shouldWarn(long thresholdBytes)
{
- if (thresholdBytes != Config.DISABLED_GUARDRAIL &&!sizeWarningEmitted
&& size > thresholdBytes)
+ if (thresholdBytes != -1 &&!sizeWarningEmitted && size >
thresholdBytes)
{
sizeWarningEmitted = true;
return true;
@@ -86,7 +85,7 @@ public final class ResultSetBuilder
public boolean shouldReject(long thresholdBytes)
{
- return thresholdBytes != Config.DISABLED_GUARDRAIL && size >
thresholdBytes;
+ return thresholdBytes != -1 && size > thresholdBytes;
}
public long getSize()
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 89ece02c4d..61e4934864 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -411,8 +411,12 @@ public class BatchStatement implements CQLStatement
if (options.getSerialConsistency() == null)
throw new InvalidRequestException("Invalid empty serial
consistency level");
+ ClientState clientState = queryState.getClientState();
Guardrails.writeConsistencyLevels.guard(EnumSet.of(options.getConsistency(),
options.getSerialConsistency()),
- queryState.getClientState());
+ clientState);
+
+ for (int i = 0; i < statements.size(); i++ )
+ statements.get(i).validateDiskUsage(options.forStatement(i),
clientState);
if (hasConditions)
return executeWithConditions(options, queryState,
queryStartNanoTime);
@@ -420,7 +424,7 @@ public class BatchStatement implements CQLStatement
if (updatesVirtualTables)
executeInternalWithoutCondition(queryState, options,
queryStartNanoTime);
else
- executeWithoutConditions(getMutations(queryState.getClientState(),
options, false, timestamp, nowInSeconds, queryStartNanoTime),
+ executeWithoutConditions(getMutations(clientState, options, false,
timestamp, nowInSeconds, queryStartNanoTime),
options.getConsistency(),
queryStartNanoTime);
return new ResultMessage.Void();
diff --git
a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 354f739bb1..ab36ec971b 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -28,6 +28,9 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.auth.Permission;
import org.apache.cassandra.db.guardrails.Guardrails;
import org.apache.cassandra.db.marshal.ValueAccessor;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaLayout;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableMetadata;
@@ -51,6 +54,7 @@ import org.apache.cassandra.exceptions.*;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.service.disk.usage.DiskUsageBroadcaster;
import org.apache.cassandra.service.paxos.Ballot;
import org.apache.cassandra.service.paxos.BallotGenerator;
import org.apache.cassandra.service.paxos.Commit.Proposal;
@@ -277,6 +281,25 @@ public abstract class ModificationStatement implements
CQLStatement.SingleKeyspa
Guardrails.userTimestampsEnabled.ensureEnabled(state);
}
+ public void validateDiskUsage(QueryOptions options, ClientState state)
+ {
+ // reject writes if any replica exceeds disk usage failure limit or
warn if it exceeds warn limit
+ if (Guardrails.replicaDiskUsage.enabled(state) &&
DiskUsageBroadcaster.instance.hasStuffedOrFullNode())
+ {
+ Keyspace keyspace = Keyspace.open(keyspace());
+
+ for (ByteBuffer key : buildPartitionKeyNames(options, state))
+ {
+ Token token = metadata().partitioner.getToken(key);
+
+ for (Replica replica :
ReplicaLayout.forTokenWriteLiveAndDown(keyspace, token).all())
+ {
+ Guardrails.replicaDiskUsage.guard(replica.endpoint(),
state);
+ }
+ }
+ }
+ }
+
public RegularAndStaticColumns updatedColumns()
{
return updatedColumns;
@@ -480,6 +503,8 @@ public abstract class ModificationStatement implements
CQLStatement.SingleKeyspa
else
cl.validateForWrite();
+ validateDiskUsage(options, queryState.getClientState());
+
List<? extends IMutation> mutations =
getMutations(queryState.getClientState(),
options,
diff --git a/src/java/org/apache/cassandra/db/Directories.java
b/src/java/org/apache/cassandra/db/Directories.java
index bcef41bc7d..972ba6d3cd 100644
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@ -640,6 +640,11 @@ public class Directories
return availableSpace > 0 ? availableSpace : 0;
}
+ public long getRawSize()
+ {
+ return FileUtils.folderSize(location);
+ }
+
@Override
public boolean equals(Object o)
{
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java
b/src/java/org/apache/cassandra/db/ReadCommand.java
index 1efb086b40..ef70588a45 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -666,8 +666,8 @@ public abstract class ReadCommand extends AbstractReadQuery
DataStorageSpec failThreshold =
DatabaseDescriptor.getLocalReadSizeFailThreshold();
if (!shouldTrackSize(warnThreshold, failThreshold))
return iterator;
- final long warnBytes = warnThreshold == null ?
Config.DISABLED_GUARDRAIL : warnThreshold.toBytes();
- final long failBytes = failThreshold == null ?
Config.DISABLED_GUARDRAIL : failThreshold.toBytes();
+ final long warnBytes = warnThreshold == null ? -1 :
warnThreshold.toBytes();
+ final long failBytes = failThreshold == null ? -1 :
failThreshold.toBytes();
class QuerySizeTracking extends Transformation<UnfilteredRowIterator>
{
private long sizeInBytes = 0;
@@ -709,7 +709,7 @@ public abstract class ReadCommand extends AbstractReadQuery
private void addSize(long size)
{
this.sizeInBytes += size;
- if (failBytes != Config.DISABLED_GUARDRAIL && this.sizeInBytes
>= failBytes)
+ if (failBytes != -1 && this.sizeInBytes >= failBytes)
{
String msg = String.format("Query %s attempted to read %d
bytes but max allowed is %s; query aborted (see
local_read_size_fail_threshold)",
ReadCommand.this.toCQLString(),
this.sizeInBytes, failThreshold);
@@ -718,7 +718,7 @@ public abstract class ReadCommand extends AbstractReadQuery
MessageParams.add(ParamType.LOCAL_READ_SIZE_FAIL,
this.sizeInBytes);
throw new LocalReadSizeTooLargeException(msg);
}
- else if (warnBytes != Config.DISABLED_GUARDRAIL &&
this.sizeInBytes >= warnBytes)
+ else if (warnBytes != -1 && this.sizeInBytes >= warnBytes)
{
MessageParams.add(ParamType.LOCAL_READ_SIZE_WARN,
this.sizeInBytes);
}
diff --git a/src/java/org/apache/cassandra/db/guardrails/Guardrail.java
b/src/java/org/apache/cassandra/db/guardrails/Guardrail.java
index 6609760109..c058f10784 100644
--- a/src/java/org/apache/cassandra/db/guardrails/Guardrail.java
+++ b/src/java/org/apache/cassandra/db/guardrails/Guardrail.java
@@ -29,6 +29,7 @@ import
org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.ClientWarn;
import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.NoSpamLogger;
/**
@@ -50,11 +51,32 @@ public abstract class Guardrail
/** A name identifying the guardrail (mainly for shipping with diagnostic
events). */
public final String name;
+ /** Minimum logging and triggering interval to avoid spamming downstream.
*/
+ private long minNotifyIntervalInMs = 0;
+
+ /** Time of last warning in milliseconds. */
+ private volatile long lastWarnInMs = 0;
+
+ /** Time of last failure in milliseconds. */
+ private volatile long lastFailInMs = 0;
+
Guardrail(String name)
{
this.name = name;
}
+ /**
+ * Checks whether this guardrail is enabled or not when the check is done
for a background opperation that is not
+ * associated to a specific {@link ClientState}, such as compaction or
other background processes. Operations that
+ * are associated to a {@link ClientState}, such as CQL queries, should
use {@link Guardrail#enabled(ClientState)}.
+ *
+ * @return {@code true} if this guardrail is enabled, {@code false}
otherwise.
+ */
+ public boolean enabled()
+ {
+ return enabled(null);
+ }
+
/**
* Checks whether this guardrail is enabled or not. This will be enabled
if the database is initialized and the
* authenticated user (if specified) is not system nor superuser.
@@ -75,6 +97,9 @@ public abstract class Guardrail
protected void warn(String message, String redactedMessage)
{
+ if (skipNotifying(true))
+ return;
+
message = decorateMessage(message);
logger.warn(message);
@@ -95,13 +120,16 @@ public abstract class Guardrail
{
message = decorateMessage(message);
- logger.error(message);
- // Note that ClientWarn will simply ignore the message if we're not
running this as part of a user query
- // (the internal "state" will be null)
- ClientWarn.instance.warn(message);
- // Similarly, tracing will also ignore the message if we're not
running tracing on the current thread.
- Tracing.trace(message);
- GuardrailsDiagnostics.failed(name, decorateMessage(redactedMessage));
+ if (!skipNotifying(false))
+ {
+ logger.error(message);
+ // Note that ClientWarn will simply ignore the message if we're
not running this as part of a user query
+ // (the internal "state" will be null)
+ ClientWarn.instance.warn(message);
+ // Similarly, tracing will also ignore the message if we're not
running tracing on the current thread.
+ Tracing.trace(message);
+ GuardrailsDiagnostics.failed(name,
decorateMessage(redactedMessage));
+ }
if (state != null)
throw new GuardrailViolatedException(message);
@@ -113,4 +141,54 @@ public abstract class Guardrail
// Add a prefix to error message so user knows what threw the warning
or cause the failure
return String.format("Guardrail %s violated: %s", name, message);
}
+
+ /**
+ * Note: this method is not thread safe and should only be used during
guardrail initialization
+ *
+ * @param minNotifyIntervalInMs frequency of logging and triggering
listener to avoid spamming,
+ * default 0 means always log and trigger
listeners.
+ * @return current guardrail
+ */
+ Guardrail minNotifyIntervalInMs(long minNotifyIntervalInMs)
+ {
+ assert minNotifyIntervalInMs >= 0;
+ this.minNotifyIntervalInMs = minNotifyIntervalInMs;
+ return this;
+ }
+
+ /**
+ * reset last notify time to make sure it will notify downstream when
{@link this#warn(String, String)}
+ * or {@link this#fail(String, ClientState)} is called next time.
+ */
+ @VisibleForTesting
+ void resetLastNotifyTime()
+ {
+ lastFailInMs = 0;
+ lastWarnInMs = 0;
+ }
+
+ /**
+ * @return true if guardrail should not log message and trigger listeners;
otherwise, update lastWarnInMs or
+ * lastFailInMs respectively.
+ */
+ private boolean skipNotifying(boolean isWarn)
+ {
+ if (minNotifyIntervalInMs == 0)
+ return false;
+
+ long nowInMs = Clock.Global.currentTimeMillis();
+ long timeElapsedInMs = nowInMs - (isWarn ? lastWarnInMs :
lastFailInMs);
+
+ boolean skip = timeElapsedInMs < minNotifyIntervalInMs;
+
+ if (!skip)
+ {
+ if (isWarn)
+ lastWarnInMs = nowInMs;
+ else
+ lastFailInMs = nowInMs;
+ }
+
+ return skip;
+ }
}
diff --git a/src/java/org/apache/cassandra/db/guardrails/Guardrails.java
b/src/java/org/apache/cassandra/db/guardrails/Guardrails.java
index cc54b279a4..d7ec81e83b 100644
--- a/src/java/org/apache/cassandra/db/guardrails/Guardrails.java
+++ b/src/java/org/apache/cassandra/db/guardrails/Guardrails.java
@@ -22,15 +22,19 @@ import java.util.Collections;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
+import javax.annotation.Nullable;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import org.apache.commons.lang3.StringUtils;
+import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.config.DataStorageSpec;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.GuardrailsOptions;
import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.service.disk.usage.DiskUsageBroadcaster;
import org.apache.cassandra.utils.MBeanWrapper;
import static java.lang.String.format;
@@ -42,7 +46,7 @@ public final class Guardrails implements GuardrailsMBean
{
public static final String MBEAN_NAME =
"org.apache.cassandra.db:type=Guardrails";
- private static final GuardrailsConfigProvider CONFIG_PROVIDER =
GuardrailsConfigProvider.instance;
+ public static final GuardrailsConfigProvider CONFIG_PROVIDER =
GuardrailsConfigProvider.instance;
private static final GuardrailsOptions DEFAULT_CONFIG =
DatabaseDescriptor.getGuardrailsConfig();
@VisibleForTesting
@@ -201,11 +205,11 @@ public final class Guardrails implements GuardrailsMBean
state ->
CONFIG_PROVIDER.getOrCreate(state).getInSelectCartesianProductWarnThreshold(),
state ->
CONFIG_PROVIDER.getOrCreate(state).getInSelectCartesianProductFailThreshold(),
(isWarning, what, value, threshold) ->
- isWarning ? format("The cartesian product of the IN
restrictions on %s produces %d values, " +
+ isWarning ? format("The cartesian product of the IN
restrictions on %s produces %s values, " +
"this exceeds warning threshold of %s.",
what, value, threshold)
: format("Aborting query because the cartesian
product of the IN restrictions on %s " +
- "produces %d values, this exceeds fail
threshold of %s.",
+ "produces %s values, this exceeds fail
threshold of %s.",
what, value, threshold));
/**
@@ -233,8 +237,8 @@ public final class Guardrails implements GuardrailsMBean
*/
public static final Threshold collectionSize =
new Threshold("collection_size",
- state ->
CONFIG_PROVIDER.getOrCreate(state).getCollectionSizeWarnThreshold().toBytes(),
- state ->
CONFIG_PROVIDER.getOrCreate(state).getCollectionSizeFailThreshold().toBytes(),
+ state ->
sizeToBytes(CONFIG_PROVIDER.getOrCreate(state).getCollectionSizeWarnThreshold()),
+ state ->
sizeToBytes(CONFIG_PROVIDER.getOrCreate(state).getCollectionSizeFailThreshold()),
(isWarning, what, value, threshold) ->
isWarning ? format("Detected collection %s of size %s, this
exceeds the warning threshold of %s.",
what, value, threshold)
@@ -267,6 +271,42 @@ public final class Guardrails implements GuardrailsMBean
: format("User types cannot have more than %s
columns, but %s provided for user type %s.",
threshold, value, what));
+ /**
+ * Guardrail on the data disk usage on the local node, used by a periodic
task to calculate and propagate that status.
+ * See {@link org.apache.cassandra.service.disk.usage.DiskUsageMonitor}
and {@link DiskUsageBroadcaster}.
+ */
+ public static final PercentageThreshold localDataDiskUsage =
+ new PercentageThreshold("local_data_disk_usage",
+ state ->
CONFIG_PROVIDER.getOrCreate(state).getDataDiskUsagePercentageWarnThreshold(),
+ state ->
CONFIG_PROVIDER.getOrCreate(state).getDataDiskUsagePercentageFailThreshold(),
+ (isWarning, what, value, threshold) ->
+ isWarning ? format("Local data disk usage %s(%s)
exceeds warning threshold of %s",
+ value, what, threshold)
+ : format("Local data disk usage %s(%s)
exceeds failure threshold of %s, " +
+ "will stop accepting writes",
+ value, what, threshold));
+
+ /**
+ * Guardrail on the data disk usage on replicas, used at write time to
verify the status of the involved replicas.
+ * See {@link org.apache.cassandra.service.disk.usage.DiskUsageMonitor}
and {@link DiskUsageBroadcaster}.
+ */
+ public static final Predicates<InetAddressAndPort> replicaDiskUsage =
+ new Predicates<>("replica_disk_usage",
+ state -> DiskUsageBroadcaster.instance::isStuffed,
+ state -> DiskUsageBroadcaster.instance::isFull,
+ // not using `value` because it represents replica
address which should be hidden from client.
+ (isWarning, value) ->
+ isWarning ? "Replica disk usage exceeds warning threshold"
+ : "Write request failed because disk usage
exceeds failure threshold");
+
+ static
+ {
+ // Avoid spamming with notifications about stuffed/full disks
+ long minNotifyInterval =
CassandraRelevantProperties.DISK_USAGE_NOTIFY_INTERVAL_MS.getLong();
+ localDataDiskUsage.minNotifyIntervalInMs(minNotifyInterval);
+ replicaDiskUsage.minNotifyIntervalInMs(minNotifyInterval);
+ }
+
private Guardrails()
{
MBeanWrapper.instance.registerMBean(this, MBEAN_NAME);
@@ -557,22 +597,24 @@ public final class Guardrails implements GuardrailsMBean
DEFAULT_CONFIG.setPartitionKeysInSelectThreshold(warn, fail);
}
- public long getCollectionSizeWarnThresholdInKiB()
+ @Override
+ @Nullable
+ public String getCollectionSizeWarnThreshold()
{
- return DEFAULT_CONFIG.getCollectionSizeWarnThreshold().toKibibytes();
+ return sizeToString(DEFAULT_CONFIG.getCollectionSizeWarnThreshold());
}
@Override
- public long getCollectionSizeFailThresholdInKiB()
+ @Nullable
+ public String getCollectionSizeFailThreshold()
{
- return DEFAULT_CONFIG.getCollectionSizeFailThreshold().toKibibytes();
+ return sizeToString(DEFAULT_CONFIG.getCollectionSizeFailThreshold());
}
@Override
- public void setCollectionSizeThresholdInKiB(long warnInKiB, long failInKiB)
+ public void setCollectionSizeThreshold(@Nullable String warnSize,
@Nullable String failSize)
{
-
DEFAULT_CONFIG.setCollectionSizeThreshold(DataStorageSpec.inKibibytes(warnInKiB),
-
DataStorageSpec.inKibibytes(failInKiB));
+ DEFAULT_CONFIG.setCollectionSizeThreshold(sizeFromString(warnSize),
sizeFromString(failSize));
}
@Override
@@ -725,6 +767,37 @@ public final class Guardrails implements GuardrailsMBean
DEFAULT_CONFIG.setFieldsPerUDTThreshold(warn, fail);
}
+ @Override
+ public int getDataDiskUsagePercentageWarnThreshold()
+ {
+ return DEFAULT_CONFIG.getDataDiskUsagePercentageWarnThreshold();
+ }
+
+ @Override
+ public int getDataDiskUsagePercentageFailThreshold()
+ {
+ return DEFAULT_CONFIG.getDataDiskUsagePercentageFailThreshold();
+ }
+
+ @Override
+ public void setDataDiskUsagePercentageThreshold(int warn, int fail)
+ {
+ DEFAULT_CONFIG.setDataDiskUsagePercentageThreshold(warn, fail);
+ }
+
+ @Override
+ @Nullable
+ public String getDataDiskUsageMaxDiskSize()
+ {
+ return sizeToString(DEFAULT_CONFIG.getDataDiskUsageMaxDiskSize());
+ }
+
+ @Override
+ public void setDataDiskUsageMaxDiskSize(@Nullable String size)
+ {
+ DEFAULT_CONFIG.setDataDiskUsageMaxDiskSize(sizeFromString(size));
+ }
+
private static String toCSV(Set<String> values)
{
return values == null || values.isEmpty() ? "" : String.join(",",
values);
@@ -758,4 +831,19 @@ public final class Guardrails implements GuardrailsMBean
return null;
return
set.stream().map(ConsistencyLevel::valueOf).collect(Collectors.toSet());
}
+
+ private static Long sizeToBytes(@Nullable DataStorageSpec size)
+ {
+ return size == null ? -1 : size.toBytes();
+ }
+
+ private static String sizeToString(@Nullable DataStorageSpec size)
+ {
+ return size == null ? null : size.toString();
+ }
+
+ private static DataStorageSpec sizeFromString(@Nullable String size)
+ {
+ return StringUtils.isEmpty(size) ? null : new DataStorageSpec(size);
+ }
}
diff --git a/src/java/org/apache/cassandra/db/guardrails/GuardrailsConfig.java
b/src/java/org/apache/cassandra/db/guardrails/GuardrailsConfig.java
index 08b3d5677b..562509b7dd 100644
--- a/src/java/org/apache/cassandra/db/guardrails/GuardrailsConfig.java
+++ b/src/java/org/apache/cassandra/db/guardrails/GuardrailsConfig.java
@@ -20,6 +20,8 @@ package org.apache.cassandra.db.guardrails;
import java.util.Set;
+import javax.annotation.Nullable;
+
import org.apache.cassandra.config.DataStorageSpec;
import org.apache.cassandra.db.ConsistencyLevel;
@@ -200,14 +202,16 @@ public interface GuardrailsConfig
*/
Set<ConsistencyLevel> getWriteConsistencyLevelsDisallowed();
- /*
+ /**
* @return The threshold to warn when encountering a collection with
larger data size than threshold.
*/
+ @Nullable
DataStorageSpec getCollectionSizeWarnThreshold();
/**
* @return The threshold to prevent collections with larger data size than
threshold.
*/
+ @Nullable
DataStorageSpec getCollectionSizeFailThreshold();
/**
@@ -229,4 +233,23 @@ public interface GuardrailsConfig
* @return The threshold to fail when creating a UDT with more fields than
threshold.
*/
int getFieldsPerUDTFailThreshold();
+
+ /**
+ * @return The threshold to warn when local disk usage percentage exceeds
that threshold.
+ * Allowed values are in the range {@code [1, 100]}, and -1 means disabled.
+ */
+ int getDataDiskUsagePercentageWarnThreshold();
+
+ /**
+ * @return The threshold to fail when local disk usage percentage exceeds
that threshold.
+ * Allowed values are in the range {@code [1, 100]}, and -1 means disabled.
+ */
+ int getDataDiskUsagePercentageFailThreshold();
+
+ /**
+ * @return The max disk size of the data directories when calculating disk
usage thresholds, {@code null} means
+ * disabled.
+ */
+ @Nullable
+ DataStorageSpec getDataDiskUsageMaxDiskSize();
}
diff --git a/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java
b/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java
index 771db67ce6..215b953b89 100644
--- a/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java
+++ b/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java
@@ -19,6 +19,7 @@
package org.apache.cassandra.db.guardrails;
import java.util.Set;
+import javax.annotation.Nullable;
/**
* JMX entrypoint for updating the default guardrails configuration parsed
from {@code cassandra.yaml}.
@@ -110,6 +111,7 @@ public interface GuardrailsMBean
/**
* Enables or disables the ability to create secondary indexes
+ *
* @param enabled
*/
void setSecondaryIndexesEnabled(boolean enabled);
@@ -401,20 +403,30 @@ public interface GuardrailsMBean
void setWriteConsistencyLevelsDisallowedCSV(String consistencyLevels);
/**
- * @return The threshold to warn when encountering larger size of
collection data than threshold, in KiB.
+ * @return The threshold to warn when encountering larger size of
collection data than threshold, as a string
+ * formatted as in, for example, {@code 10GiB}, {@code 20MiB}, {@code
30KiB} or {@code 40B}. A {@code null} value
+ * means that the threshold is disabled.
*/
- long getCollectionSizeWarnThresholdInKiB();
+ @Nullable
+ String getCollectionSizeWarnThreshold();
/**
- * @return The threshold to prevent collections with larger data size than
threshold, in KiB.
+ * @return The threshold to prevent collections with larger data size than
threshold, as a string formatted as in,
+ * for example, {@code 10GiB}, {@code 20MiB}, {@code 30KiB} or {@code
40B}. A {@code null} value means that the
+ * threshold is disabled.
*/
- long getCollectionSizeFailThresholdInKiB();
+ @Nullable
+ String getCollectionSizeFailThreshold();
/**
- * @param warnInKiB The threshold to warn when encountering larger size of
collection data than threshold, in KiB.
- * @param failInKiB The threshold to prevent collections with larger data
size than threshold, in KiB.
+ * @param warnSize The threshold to warn when encountering larger size of
collection data than threshold, as a
+ * string formatted as in, for example, {@code 10GiB},
{@code 20MiB}, {@code 30KiB} or {@code 40B}.
+ * A {@code null} value means disabled.
+ * @param failSize The threshold to prevent collections with larger data
size than threshold, as a string formatted
+ * as in, for example, {@code 10GiB}, {@code 20MiB},
{@code 30KiB} or {@code 40B}. A {@code null}
+ * value means disabled.
*/
- void setCollectionSizeThresholdInKiB(long warnInKiB, long failInKiB);
+ void setCollectionSizeThreshold(@Nullable String warnSize, @Nullable
String failSize);
/**
* @return The threshold to warn when encountering more elements in a
collection than threshold.
@@ -447,4 +459,39 @@ public interface GuardrailsMBean
* @param fail The threshold to prevent creating a UDT with more fields
than threshold. -1 means disabled.
*/
void setFieldsPerUDTThreshold(int warn, int fail);
+
+ /**
+ * @return The threshold to warn when local data disk usage percentage
exceeds that threshold.
+ * Allowed values are in the range {@code [1, 100]}, and -1 means disabled.
+ */
+ int getDataDiskUsagePercentageWarnThreshold();
+
+ /**
+ * @return The threshold to fail when local data disk usage percentage
exceeds that threshold.
+ * Allowed values are in the range {@code [1, 100]}, and -1 means disabled.
+ */
+ int getDataDiskUsagePercentageFailThreshold();
+
+ /**
+ * @param warn The threshold to warn when local disk usage percentage
exceeds that threshold.
+ * Allowed values are in the range {@code [1, 100]}, and -1
means disabled.
+ * @param fail The threshold to fail when local disk usage percentage
exceeds that threshold.
+ * Allowed values are in the range {@code [1, 100]}, and -1
means disabled.
+ */
+ public void setDataDiskUsagePercentageThreshold(int warn, int fail);
+
+ /**
+ * @return The max disk size of the data directories when calculating disk
usage thresholds, as a string formatted
+ * as in, for example, {@code 10GiB}, {@code 20MiB}, {@code 30KiB} or
{@code 40B}. A {@code null} value means
+ * disabled.
+ */
+ @Nullable
+ String getDataDiskUsageMaxDiskSize();
+
+ /**
+ * @param size The max disk size of the data directories when calculating
disk usage thresholds, as a string
+ * formatted as in, for example, {@code 10GiB}, {@code 20MiB},
{@code 30KiB} or {@code 40B}.
+ * A {@code null} value means disabled.
+ */
+ void setDataDiskUsageMaxDiskSize(@Nullable String size);
}
diff --git
a/src/java/org/apache/cassandra/db/guardrails/PercentageThreshold.java
b/src/java/org/apache/cassandra/db/guardrails/PercentageThreshold.java
new file mode 100644
index 0000000000..c08d02641b
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/guardrails/PercentageThreshold.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.guardrails;
+
+import java.util.function.ToLongFunction;
+
+import org.apache.cassandra.service.ClientState;
+
+/**
+ * A {@link Threshold} guardrail whose values represent a percentage
+ * <p>
+ * This works exactly as a {@link Threshold}, but provides slightly more
convenient error messages for percentage
+ */
+public class PercentageThreshold extends Threshold
+{
+ /**
+ * Creates a new threshold guardrail.
+ *
+ * @param name the identifying name of the guardrail
+ * @param warnThreshold a {@link ClientState}-based provider of the
value above which a warning should be triggered.
+ * @param failThreshold a {@link ClientState}-based provider of the
value above which the operation should be aborted.
+ * @param messageProvider a function to generate the warning or error
message if the guardrail is triggered
+ */
+ public PercentageThreshold(String name,
+ ToLongFunction<ClientState> warnThreshold,
+ ToLongFunction<ClientState> failThreshold,
+ ErrorMessageProvider messageProvider)
+ {
+ super(name, warnThreshold, failThreshold, messageProvider);
+ }
+
+ @Override
+ protected String errMsg(boolean isWarning, String what, long value, long
thresholdValue)
+ {
+ return messageProvider.createMessage(isWarning,
+ what,
+ String.format("%d%%", value),
+ String.format("%d%%",
thresholdValue));
+ }
+}
diff --git a/src/java/org/apache/cassandra/db/guardrails/Predicates.java
b/src/java/org/apache/cassandra/db/guardrails/Predicates.java
new file mode 100644
index 0000000000..13be9e9302
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/guardrails/Predicates.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.guardrails;
+
+import java.util.function.Function;
+import java.util.function.Predicate;
+import javax.annotation.Nullable;
+
+import org.apache.cassandra.service.ClientState;
+
+/**
+ * A guardrail based on two predicates.
+ *
+ * <p>A {@link Predicates} guardrail defines (up to) 2 predicates, one at
which a warning is issued, and another one
+ * at which a failure is triggered. If failure is triggered, warning is
skipped.
+ *
+ * @param <T> the type of the values to be tested against predicates.
+ */
+public class Predicates<T> extends Guardrail
+{
+ private final Function<ClientState, Predicate<T>> warnPredicate;
+ private final Function<ClientState, Predicate<T>> failurePredicate;
+ private final MessageProvider<T> messageProvider;
+
+ /**
+ * A function used to build the warning or error message of a triggered
{@link Predicates} guardrail.
+ */
+ public interface MessageProvider<T>
+ {
+ /**
+ * Called when the guardrail is triggered to build the corresponding
message.
+ *
+ * @param isWarning whether the trigger is a warning one; otherwise it
is failure one.
+ * @param value the value that triggers guardrail.
+ */
+ String createMessage(boolean isWarning, T value);
+ }
+
+ /**
+ * Creates a new {@link Predicates} guardrail.
+ *
+ * @param name the identifying name of the guardrail
+ * @param warnPredicate a {@link ClientState}-based predicate provider
that is used to check if given value should trigger a warning.
+ * @param failurePredicate a {@link ClientState}-based predicate provider
that is used to check if given value should trigger a failure.
+ * @param messageProvider a function to generate the warning or error
message if the guardrail is triggered
+ */
+ Predicates(String name,
+ Function<ClientState, Predicate<T>> warnPredicate,
+ Function<ClientState, Predicate<T>> failurePredicate,
+ MessageProvider<T> messageProvider)
+ {
+ super(name);
+ this.warnPredicate = warnPredicate;
+ this.failurePredicate = failurePredicate;
+ this.messageProvider = messageProvider;
+ }
+
+ /**
+ * Apply the guardrail to the provided value, triggering a warning or
failure if appropriate.
+ *
+ * @param value the value to check.
+ */
+ public void guard(T value, @Nullable ClientState state)
+ {
+ if (!enabled(state))
+ return;
+
+ if (failurePredicate.apply(state).test(value))
+ {
+ fail(messageProvider.createMessage(false, value), state);
+ }
+ else if (warnPredicate.apply(state).test(value))
+ {
+ warn(messageProvider.createMessage(true, value));
+ }
+ }
+}
diff --git a/src/java/org/apache/cassandra/db/guardrails/Threshold.java
b/src/java/org/apache/cassandra/db/guardrails/Threshold.java
index f88d1e0cbe..f7e4823448 100644
--- a/src/java/org/apache/cassandra/db/guardrails/Threshold.java
+++ b/src/java/org/apache/cassandra/db/guardrails/Threshold.java
@@ -35,7 +35,7 @@ public class Threshold extends Guardrail
{
private final ToLongFunction<ClientState> warnThreshold;
private final ToLongFunction<ClientState> failThreshold;
- private final ErrorMessageProvider messageProvider;
+ protected final ErrorMessageProvider messageProvider;
/**
* Creates a new threshold guardrail.
@@ -56,12 +56,12 @@ public class Threshold extends Guardrail
this.messageProvider = messageProvider;
}
- private String errMsg(boolean isWarning, String what, long value, long
thresholdValue)
+ protected String errMsg(boolean isWarning, String what, long value, long
thresholdValue)
{
return messageProvider.createMessage(isWarning,
what,
- value,
- thresholdValue);
+ Long.toString(value),
+ Long.toString(thresholdValue));
}
private String redactedErrMsg(boolean isWarning, long value, long
thresholdValue)
@@ -108,6 +108,16 @@ public class Threshold extends Guardrail
return enabled(state) && (value > Math.min(failValue(state),
warnValue(state)));
}
+ public boolean warnsOn(long value, @Nullable ClientState state)
+ {
+ return enabled(state) && (value > warnValue(state) && value <=
failValue(state));
+ }
+
+ public boolean failsOn(long value, @Nullable ClientState state)
+ {
+ return enabled(state) && (value > failValue(state));
+ }
+
/**
* Apply the guardrail to the provided value, warning or failing if
appropriate.
*
@@ -163,6 +173,6 @@ public class Threshold extends Guardrail
* @param value The value that triggered the guardrail (as a
string).
* @param threshold The threshold that was passed to trigger the
guardrail (as a string).
*/
- String createMessage(boolean isWarning, String what, long value, long
threshold);
+ String createMessage(boolean isWarning, String what, String value,
String threshold);
}
}
diff --git a/src/java/org/apache/cassandra/gms/ApplicationState.java
b/src/java/org/apache/cassandra/gms/ApplicationState.java
index 4e20d62048..c45d3c2602 100644
--- a/src/java/org/apache/cassandra/gms/ApplicationState.java
+++ b/src/java/org/apache/cassandra/gms/ApplicationState.java
@@ -56,6 +56,7 @@ public enum ApplicationState
* a comma-separated list.
**/
SSTABLE_VERSIONS,
+ DISK_USAGE,
// DO NOT EDIT OR REMOVE PADDING STATES BELOW - only add new states above.
See CASSANDRA-16484
X1,
X2,
diff --git a/src/java/org/apache/cassandra/gms/VersionedValue.java
b/src/java/org/apache/cassandra/gms/VersionedValue.java
index 63e9487d7c..26644e17cc 100644
--- a/src/java/org/apache/cassandra/gms/VersionedValue.java
+++ b/src/java/org/apache/cassandra/gms/VersionedValue.java
@@ -172,6 +172,11 @@ public class VersionedValue implements
Comparable<VersionedValue>
return new VersionedValue(String.valueOf(load));
}
+ public VersionedValue diskUsage(String state)
+ {
+ return new VersionedValue(state);
+ }
+
public VersionedValue schema(UUID newVersion)
{
return new VersionedValue(newVersion.toString());
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
index 93447c2f44..186ee5abc4 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
@@ -398,7 +398,7 @@ public abstract class SSTableWriter extends SSTable
implements Transactional
public static void guardCollectionSize(TableMetadata metadata,
DecoratedKey partitionKey, Unfiltered unfiltered)
{
- if (!Guardrails.collectionSize.enabled(null) &&
!Guardrails.itemsPerCollection.enabled(null))
+ if (!Guardrails.collectionSize.enabled() &&
!Guardrails.itemsPerCollection.enabled())
return;
if (!unfiltered.isRow() ||
SchemaConstants.isSystemKeyspace(metadata.keyspace))
diff --git a/src/java/org/apache/cassandra/service/StorageService.java
b/src/java/org/apache/cassandra/service/StorageService.java
index 0e8b2570fe..6562635dca 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -72,6 +72,7 @@ import
org.apache.cassandra.fql.FullQueryLoggerOptionsCompositeData;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.locator.ReplicaCollection.Builder.Conflict;
import org.apache.cassandra.schema.Keyspaces;
+import org.apache.cassandra.service.disk.usage.DiskUsageBroadcaster;
import org.apache.cassandra.utils.concurrent.Future;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.utils.concurrent.FutureCombiner;
@@ -1034,6 +1035,7 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
gossipSnitchInfo();
Schema.instance.startSync();
LoadBroadcaster.instance.startBroadcasting();
+ DiskUsageBroadcaster.instance.startBroadcasting();
HintsService.instance.startDispatch();
BatchlogManager.instance.start();
snapshotManager.start();
diff --git
a/src/java/org/apache/cassandra/service/disk/usage/DiskUsageBroadcaster.java
b/src/java/org/apache/cassandra/service/disk/usage/DiskUsageBroadcaster.java
new file mode 100644
index 0000000000..4504ac7f62
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/disk/usage/DiskUsageBroadcaster.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.disk.usage;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.EndpointState;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
+import org.apache.cassandra.gms.VersionedValue;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.NoSpamLogger;
+
+/**
+ * Starts {@link DiskUsageMonitor} to monitor local disk usage state and
broadcast new state via Gossip.
+ * At the same time, it caches cluster's disk usage state received via Gossip.
+ */
+public class DiskUsageBroadcaster implements IEndpointStateChangeSubscriber
+{
+ private static final Logger logger =
LoggerFactory.getLogger(DiskUsageBroadcaster.class);
+ private static final NoSpamLogger noSpamLogger =
NoSpamLogger.getLogger(logger, 10, TimeUnit.MINUTES);
+
+ public static final DiskUsageBroadcaster instance = new
DiskUsageBroadcaster(DiskUsageMonitor.instance);
+
+ private final DiskUsageMonitor monitor;
+ private final ConcurrentMap<InetAddressAndPort, DiskUsageState> usageInfo
= new ConcurrentHashMap<>();
+ private volatile boolean hasStuffedOrFullNode = false;
+
+ @VisibleForTesting
+ public DiskUsageBroadcaster(DiskUsageMonitor monitor)
+ {
+ this.monitor = monitor;
+ Gossiper.instance.register(this);
+ }
+
+ /**
+ * @return {@code true} if any node in the cluster is STUFFED OR FULL
+ */
+ public boolean hasStuffedOrFullNode()
+ {
+ return hasStuffedOrFullNode;
+ }
+
+ /**
+ * @return {@code true} if given node's disk usage is FULL
+ */
+ public boolean isFull(InetAddressAndPort endpoint)
+ {
+ return state(endpoint).isFull();
+ }
+
+ /**
+ * @return {@code true} if given node's disk usage is STUFFED
+ */
+ public boolean isStuffed(InetAddressAndPort endpoint)
+ {
+ return state(endpoint).isStuffed();
+ }
+
+ @VisibleForTesting
+ public DiskUsageState state(InetAddressAndPort endpoint)
+ {
+ return usageInfo.getOrDefault(endpoint, DiskUsageState.NOT_AVAILABLE);
+ }
+
+ public void startBroadcasting()
+ {
+ monitor.start(newState -> {
+
+ if (logger.isTraceEnabled())
+ logger.trace("Disseminating disk usage info: {}", newState);
+
+
Gossiper.instance.addLocalApplicationState(ApplicationState.DISK_USAGE,
+
StorageService.instance.valueFactory.diskUsage(newState.name()));
+ });
+ }
+
+ @Override
+ public void onChange(InetAddressAndPort endpoint, ApplicationState state,
VersionedValue value)
+ {
+ if (state != ApplicationState.DISK_USAGE)
+ return;
+
+ DiskUsageState usageState = DiskUsageState.NOT_AVAILABLE;
+ try
+ {
+ usageState = DiskUsageState.valueOf(value.value);
+ }
+ catch (IllegalArgumentException e)
+ {
+ noSpamLogger.warn(String.format("Found unknown DiskUsageState: %s.
Using default state %s instead.",
+ value.value, usageState));
+ }
+ usageInfo.put(endpoint, usageState);
+
+ hasStuffedOrFullNode = usageState.isStuffedOrFull() ||
computeHasStuffedOrFullNode();
+ }
+
+ private boolean computeHasStuffedOrFullNode()
+ {
+ for (DiskUsageState replicaState : usageInfo.values())
+ {
+ if (replicaState.isStuffedOrFull())
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public void onJoin(InetAddressAndPort endpoint, EndpointState epState)
+ {
+ updateDiskUsage(endpoint, epState);
+ }
+
+ @Override
+ public void beforeChange(InetAddressAndPort endpoint, EndpointState
currentState, ApplicationState newStateKey, VersionedValue newValue)
+ {
+ // nothing to do here
+ }
+
+ @Override
+ public void onAlive(InetAddressAndPort endpoint, EndpointState state)
+ {
+ updateDiskUsage(endpoint, state);
+ }
+
+ @Override
+ public void onDead(InetAddressAndPort endpoint, EndpointState state)
+ {
+ // do nothing, as we don't care about dead nodes
+ }
+
+ @Override
+ public void onRestart(InetAddressAndPort endpoint, EndpointState state)
+ {
+ updateDiskUsage(endpoint, state);
+ }
+
+ @Override
+ public void onRemove(InetAddressAndPort endpoint)
+ {
+ usageInfo.remove(endpoint);
+ hasStuffedOrFullNode =
usageInfo.values().stream().anyMatch(DiskUsageState::isStuffedOrFull);
+ }
+
+ private void updateDiskUsage(InetAddressAndPort endpoint, EndpointState
state)
+ {
+ VersionedValue localValue =
state.getApplicationState(ApplicationState.DISK_USAGE);
+
+ if (localValue != null)
+ {
+ onChange(endpoint, ApplicationState.DISK_USAGE, localValue);
+ }
+ }
+}
diff --git
a/src/java/org/apache/cassandra/service/disk/usage/DiskUsageMonitor.java
b/src/java/org/apache/cassandra/service/disk/usage/DiskUsageMonitor.java
new file mode 100644
index 0000000000..7395c5fb3b
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/disk/usage/DiskUsageMonitor.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.disk.usage;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.math.RoundingMode;
+import java.nio.file.FileStore;
+import java.nio.file.Files;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.config.DataStorageSpec;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.Memtable;
+import org.apache.cassandra.db.guardrails.Guardrails;
+import org.apache.cassandra.db.guardrails.GuardrailsConfig;
+import org.apache.cassandra.io.util.FileUtils;
+
+/**
+ * Schedule periodic task to monitor local disk usage and notify {@link
DiskUsageBroadcaster} if local state changed.
+ */
+public class DiskUsageMonitor
+{
+ private static final Logger logger =
LoggerFactory.getLogger(DiskUsageMonitor.class);
+
+ public static DiskUsageMonitor instance = new DiskUsageMonitor();
+
+ private final Supplier<GuardrailsConfig> guardrailsConfigSupplier = () ->
Guardrails.CONFIG_PROVIDER.getOrCreate(null);
+ private final Supplier<Multimap<FileStore, Directories.DataDirectory>>
dataDirectoriesSupplier;
+
+ private volatile DiskUsageState localState = DiskUsageState.NOT_AVAILABLE;
+
+ @VisibleForTesting
+ public DiskUsageMonitor()
+ {
+ this.dataDirectoriesSupplier =
DiskUsageMonitor::dataDirectoriesGroupedByFileStore;
+ }
+
+ @VisibleForTesting
+ public DiskUsageMonitor(Supplier<Multimap<FileStore,
Directories.DataDirectory>> dataDirectoriesSupplier)
+ {
+ this.dataDirectoriesSupplier = dataDirectoriesSupplier;
+ }
+
+ /**
+ * Start monitoring local disk usage and call notifier when local disk
usage state changed.
+ */
+ public void start(Consumer<DiskUsageState> notifier)
+ {
+ // start the scheduler regardless guardrail is enabled, so we can
enable it later without a restart
+ ScheduledExecutors.scheduledTasks.scheduleAtFixedRate(() -> {
+
+ if (!Guardrails.localDataDiskUsage.enabled(null))
+ return;
+
+ updateLocalState(getDiskUsage(), notifier);
+ }, 0,
CassandraRelevantProperties.DISK_USAGE_MONITOR_INTERVAL_MS.getLong(),
TimeUnit.MILLISECONDS);
+ }
+
+ @VisibleForTesting
+ public void updateLocalState(double usageRatio, Consumer<DiskUsageState>
notifier)
+ {
+ double percentage = usageRatio * 100;
+ long percentageCeiling = (long) Math.ceil(percentage);
+
+ DiskUsageState state = getState(percentageCeiling);
+
+ Guardrails.localDataDiskUsage.guard(percentageCeiling,
state.toString(), false, null);
+
+ // if state remains unchanged, no need to notify peers
+ if (state == localState)
+ return;
+
+ localState = state;
+ notifier.accept(state);
+ }
+
+ /**
+ * @return local node disk usage state
+ */
+ @VisibleForTesting
+ public DiskUsageState state()
+ {
+ return localState;
+ }
+
+ /**
+ * @return The current disk usage (including all memtable sizes) ratio.
This is the ratio between the space taken by
+ * all the data directories and the addition of that same space and the
free available space on disk. The space
+ * taken by the data directories is the addition of the actual space on
disk plus the size of the memtables.
+ * Memtables are included in that calculation because they are expected to
be eventually flushed to disk.
+ */
+ @VisibleForTesting
+ public double getDiskUsage()
+ {
+ // using BigInteger to handle large file system
+ BigInteger used = BigInteger.ZERO; // space used by data directories
+ BigInteger usable = BigInteger.ZERO; // free space on disks
+
+ for (Map.Entry<FileStore, Collection<Directories.DataDirectory>> e :
dataDirectoriesSupplier.get().asMap().entrySet())
+ {
+ usable = usable.add(BigInteger.valueOf(usableSpace(e.getKey())));
+
+ for (Directories.DataDirectory dir : e.getValue())
+ used = used.add(BigInteger.valueOf(dir.getRawSize()));
+ }
+
+ // The total disk size for data directories is the space that is
actually used by those directories plus the
+ // free space on disk that might be used for storing those directories
in the future.
+ BigInteger total = used.add(usable);
+
+ // That total space can be limited by the config property
data_disk_usage_max_disk_size.
+ DataStorageSpec diskUsageMaxSize =
guardrailsConfigSupplier.get().getDataDiskUsageMaxDiskSize();
+ if (diskUsageMaxSize != null)
+ total = total.min(BigInteger.valueOf(diskUsageMaxSize.toBytes()));
+
+ // Add memtables size to the amount of used space because those
memtables will be flushed to data directories.
+ used = used.add(BigInteger.valueOf(getAllMemtableSize()));
+
+ if (logger.isTraceEnabled())
+ logger.trace("Disk Usage Guardrail: current disk usage = {}, total
disk usage = {}.",
+ FileUtils.stringifyFileSize(used.doubleValue()),
+ FileUtils.stringifyFileSize(total.doubleValue()));
+
+ return new BigDecimal(used).divide(new BigDecimal(total), 5,
RoundingMode.UP).doubleValue();
+ }
+
+ @VisibleForTesting
+ public long getAllMemtableSize()
+ {
+ long size = 0;
+
+ for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
+ {
+ for (Memtable memtable :
cfs.getTracker().getView().getAllMemtables())
+ {
+ size += memtable.getLiveDataSize();
+ }
+ }
+
+ return size;
+ }
+
+ @VisibleForTesting
+ public DiskUsageState getState(long usagePercentage)
+ {
+ if (!Guardrails.localDataDiskUsage.enabled())
+ return DiskUsageState.NOT_AVAILABLE;
+
+ if (Guardrails.localDataDiskUsage.failsOn(usagePercentage, null))
+ return DiskUsageState.FULL;
+
+ if (Guardrails.localDataDiskUsage.warnsOn(usagePercentage, null))
+ return DiskUsageState.STUFFED;
+
+ return DiskUsageState.SPACIOUS;
+ }
+
+ public static Multimap<FileStore, Directories.DataDirectory>
dataDirectoriesGroupedByFileStore()
+ {
+ Multimap<FileStore, Directories.DataDirectory> directories =
HashMultimap.create();
+ try
+ {
+ for (Directories.DataDirectory dir :
Directories.dataDirectories.getAllDirectories())
+ {
+ FileStore store = Files.getFileStore(dir.location.toPath());
+ directories.put(store, dir);
+ }
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException("Cannot get data directories grouped by
file store", e);
+ }
+ return directories;
+ }
+
+ public static long totalSpace(FileStore store)
+ {
+ try
+ {
+ long size = store.getTotalSpace();
+ return size < 0 ? Long.MAX_VALUE : size;
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException("Cannot get total space of file store",
e);
+ }
+ }
+
+ public static long usableSpace(FileStore store)
+ {
+ try
+ {
+ long size = store.getUsableSpace();
+ return size < 0 ? Long.MAX_VALUE : size;
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException("Cannot get usable size of file store",
e);
+ }
+ }
+}
+
diff --git
a/src/java/org/apache/cassandra/service/disk/usage/DiskUsageState.java
b/src/java/org/apache/cassandra/service/disk/usage/DiskUsageState.java
new file mode 100644
index 0000000000..9a46251ff8
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/disk/usage/DiskUsageState.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.disk.usage;
+
+import org.apache.cassandra.db.guardrails.GuardrailsConfig;
+
+public enum DiskUsageState
+{
+ /** Either disk usage guardrail is not enabled or gossip state is not
ready. */
+ NOT_AVAILABLE("Not Available"),
+
+ /**
+ * Disk usage is below both {@link
GuardrailsConfig#getDataDiskUsagePercentageWarnThreshold()} ()} and
+ * {@link GuardrailsConfig#getDataDiskUsagePercentageFailThreshold()}.
+ */
+ SPACIOUS("Spacious"),
+
+ /**
+ * Disk usage exceeds {@link
GuardrailsConfig#getDataDiskUsagePercentageWarnThreshold()} but is below
+ * {@link GuardrailsConfig#getDataDiskUsagePercentageFailThreshold()}.
+ */
+ STUFFED("Stuffed"),
+
+ /** Disk usage exceeds {@link
GuardrailsConfig#getDataDiskUsagePercentageFailThreshold()}. */
+ FULL("Full");
+
+ private final String msg;
+
+ DiskUsageState(String msg)
+ {
+ this.msg = msg;
+ }
+
+ public boolean isFull()
+ {
+ return this == FULL;
+ }
+
+ public boolean isStuffed()
+ {
+ return this == STUFFED;
+ }
+
+ public boolean isStuffedOrFull()
+ {
+ return isFull() || isStuffed();
+ }
+
+ @Override
+ public String toString()
+ {
+ return msg;
+ }
+}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/guardrails/GuardrailDiskUsageTest.java
b/test/distributed/org/apache/cassandra/distributed/test/guardrails/GuardrailDiskUsageTest.java
new file mode 100644
index 0000000000..e1868e280f
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/guardrails/GuardrailDiskUsageTest.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.guardrails;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.Callable;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.InvalidQueryException;
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.db.guardrails.Guardrails;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.service.disk.usage.DiskUsageBroadcaster;
+import org.apache.cassandra.service.disk.usage.DiskUsageMonitor;
+import org.apache.cassandra.service.disk.usage.DiskUsageState;
+import org.assertj.core.api.Assertions;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+
+/**
+ * Tests the guardrails for disk usage, {@link Guardrails#localDataDiskUsage}
and {@link Guardrails#replicaDiskUsage}.
+ */
+public class GuardrailDiskUsageTest extends GuardrailTester
+{
+ private static final int NUM_ROWS = 100;
+
+ private static final String WARN_MESSAGE = "Replica disk usage exceeds
warning threshold";
+ private static final String FAIL_MESSAGE = "Write request failed because
disk usage exceeds failure threshold";
+
+ private static Cluster cluster;
+ private static com.datastax.driver.core.Cluster driverCluster;
+ private static Session driverSession;
+
+ @BeforeClass
+ public static void setupCluster() throws IOException
+ {
+ // speed up the task that calculates and propagates the disk usage info
+ CassandraRelevantProperties.DISK_USAGE_MONITOR_INTERVAL_MS.setInt(100);
+
+ // build a 2-node cluster with RF=1
+ cluster = init(Cluster.build(2)
+
.withInstanceInitializer(DiskStateInjection::install)
+ .withConfig(c -> c.with(Feature.GOSSIP,
Feature.NATIVE_PROTOCOL)
+
.set("data_disk_usage_percentage_warn_threshold", 98)
+
.set("data_disk_usage_percentage_fail_threshold", 99)
+ .set("authenticator",
"PasswordAuthenticator"))
+ .start(), 1);
+
+ // create a regular user, since the default superuser is excluded from
guardrails
+ com.datastax.driver.core.Cluster.Builder builder =
com.datastax.driver.core.Cluster.builder().addContactPoint("127.0.0.1");
+ try (com.datastax.driver.core.Cluster c =
builder.withCredentials("cassandra", "cassandra").build();
+ Session session = c.connect())
+ {
+ session.execute("CREATE USER test WITH PASSWORD 'test'");
+ }
+
+ // connect using that superuser, we use the driver to get access to
the client warnings
+ driverCluster = builder.withCredentials("test", "test").build();
+ driverSession = driverCluster.connect();
+ }
+
+ @AfterClass
+ public static void teardownCluster()
+ {
+ if (driverSession != null)
+ driverSession.close();
+
+ if (driverCluster != null)
+ driverCluster.close();
+
+ if (cluster != null)
+ cluster.close();
+ }
+
+ @Override
+ protected Cluster getCluster()
+ {
+ return cluster;
+ }
+
+ @Test
+ public void testDiskUsage() throws Throwable
+ {
+ schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v int)");
+ String insert = format("INSERT INTO %s(k, v) VALUES (?, 0)");
+
+ // With both nodes in SPACIOUS state, we can write without warnings
nor failures
+ for (int i = 0; i < NUM_ROWS; i++)
+ {
+ ResultSet rs = driverSession.execute(insert, i);
+
Assertions.assertThat(rs.getExecutionInfo().getWarnings()).isEmpty();
+ }
+
+ // If the disk usage information about one node becomes unavailable,
we can still write without warnings
+ DiskStateInjection.setState(getCluster(), 2,
DiskUsageState.NOT_AVAILABLE);
+ for (int i = 0; i < NUM_ROWS; i++)
+ {
+ ResultSet rs = driverSession.execute(insert, i);
+
Assertions.assertThat(rs.getExecutionInfo().getWarnings()).isEmpty();
+ }
+
+ // If one node becomes STUFFED, the writes targeting that node will
raise a warning, while the writes targetting
+ // the node that remains SPACIOUS will keep succeeding without warnings
+ DiskStateInjection.setState(getCluster(), 2, DiskUsageState.STUFFED);
+ int numWarnings = 0;
+ for (int i = 0; i < NUM_ROWS; i++)
+ {
+ ResultSet rs = driverSession.execute(insert, i);
+
+ List<String> warnings = rs.getExecutionInfo().getWarnings();
+ if (!warnings.isEmpty())
+ {
+ Assertions.assertThat(warnings).hasSize(1).anyMatch(s ->
s.contains(WARN_MESSAGE));
+ numWarnings++;
+ }
+ }
+
Assertions.assertThat(numWarnings).isGreaterThan(0).isLessThan(NUM_ROWS);
+
+ // If the STUFFED node becomes FULL, the writes targeting that node
will fail, while the writes targeting
+ // the node that remains SPACIOUS will keep succeeding without warnings
+ DiskStateInjection.setState(getCluster(), 2, DiskUsageState.FULL);
+ int numFailures = 0;
+ for (int i = 0; i < NUM_ROWS; i++)
+ {
+ try
+ {
+ ResultSet rs = driverSession.execute(insert, i);
+
Assertions.assertThat(rs.getExecutionInfo().getWarnings()).isEmpty();
+ }
+ catch (InvalidQueryException e)
+ {
+ Assertions.assertThat(e).hasMessageContaining(FAIL_MESSAGE);
+ numFailures++;
+ }
+ }
+
Assertions.assertThat(numFailures).isGreaterThan(0).isLessThan(NUM_ROWS);
+
+ // If both nodes are FULL, all queries will fail
+ DiskStateInjection.setState(getCluster(), 1, DiskUsageState.FULL);
+ for (int i = 0; i < NUM_ROWS; i++)
+ {
+ try
+ {
+ driverSession.execute(insert, i);
+ Assertions.fail("Should have failed");
+ }
+ catch (InvalidQueryException e)
+ {
+ numFailures++;
+ }
+ }
+
+ // Finally, if both nodes go back to SPACIOUS, all queries will
succeed again
+ DiskStateInjection.setState(getCluster(), 1, DiskUsageState.SPACIOUS);
+ DiskStateInjection.setState(getCluster(), 2, DiskUsageState.SPACIOUS);
+ for (int i = 0; i < NUM_ROWS; i++)
+ {
+ ResultSet rs = driverSession.execute(insert, i);
+
Assertions.assertThat(rs.getExecutionInfo().getWarnings()).isEmpty();
+ }
+ }
+
+ /**
+ * ByteBuddy rule to override the disk usage state of each node.
+ */
+ public static class DiskStateInjection
+ {
+ public static volatile DiskUsageState state = DiskUsageState.SPACIOUS;
+
+ private static void install(ClassLoader cl, int node)
+ {
+ new ByteBuddy().rebase(DiskUsageMonitor.class)
+ .method(named("getState"))
+
.intercept(MethodDelegation.to(DiskStateInjection.class))
+ .make()
+ .load(cl, ClassLoadingStrategy.Default.INJECTION);
+ }
+
+ public static void setState(Cluster cluster, int node, DiskUsageState
state)
+ {
+ IInvokableInstance instance = cluster.get(node);
+ instance.runOnInstance(() -> DiskStateInjection.state = state);
+
+ // wait for disk usage state propagation, all nodes must see it
+ InetAddressAndPort enpoint =
InetAddressAndPort.getByAddress(instance.broadcastAddress());
+ cluster.forEach(n -> n.runOnInstance(() ->
Util.spinAssertEquals(state, () ->
DiskUsageBroadcaster.instance.state(enpoint), 60)));
+ }
+
+ @SuppressWarnings("unused")
+ public static DiskUsageState getState(long usagePercentage, @SuperCall
Callable<DiskUsageState> zuper)
+ {
+ return state;
+ }
+ }
+}
diff --git a/test/unit/org/apache/cassandra/config/DataStorageSpecTest.java
b/test/unit/org/apache/cassandra/config/DataStorageSpecTest.java
index 66c644498d..4dad97a0a8 100644
--- a/test/unit/org/apache/cassandra/config/DataStorageSpecTest.java
+++ b/test/unit/org/apache/cassandra/config/DataStorageSpecTest.java
@@ -101,11 +101,36 @@ public class DataStorageSpecTest
public void testEquals()
{
assertEquals(new DataStorageSpec("10B"), new DataStorageSpec("10B"));
+
assertEquals(new DataStorageSpec("10KiB"), new
DataStorageSpec("10240B"));
assertEquals(new DataStorageSpec("10240B"), new
DataStorageSpec("10KiB"));
+
+ assertEquals(new DataStorageSpec("10MiB"), new
DataStorageSpec("10240KiB"));
+ assertEquals(new DataStorageSpec("10240KiB"), new
DataStorageSpec("10MiB"));
+
+ assertEquals(new DataStorageSpec("10GiB"), new
DataStorageSpec("10240MiB"));
+ assertEquals(new DataStorageSpec("10240MiB"), new
DataStorageSpec("10GiB"));
+
+ assertNotEquals(DataStorageSpec.inBytes(Long.MAX_VALUE),
DataStorageSpec.inGibibytes(Long.MAX_VALUE));
+ assertNotEquals(DataStorageSpec.inBytes(Long.MAX_VALUE),
DataStorageSpec.inMebibytes(Long.MAX_VALUE));
+ assertNotEquals(DataStorageSpec.inBytes(Long.MAX_VALUE),
DataStorageSpec.inKibibytes(Long.MAX_VALUE));
+ assertEquals(DataStorageSpec.inBytes(Long.MAX_VALUE),
DataStorageSpec.inBytes(Long.MAX_VALUE));
+
+ assertNotEquals(DataStorageSpec.inKibibytes(Long.MAX_VALUE),
DataStorageSpec.inGibibytes(Long.MAX_VALUE));
+ assertNotEquals(DataStorageSpec.inKibibytes(Long.MAX_VALUE),
DataStorageSpec.inMebibytes(Long.MAX_VALUE));
+ assertEquals(DataStorageSpec.inKibibytes(Long.MAX_VALUE),
DataStorageSpec.inKibibytes(Long.MAX_VALUE));
+ assertNotEquals(DataStorageSpec.inKibibytes(Long.MAX_VALUE),
DataStorageSpec.inBytes(Long.MAX_VALUE));
+
+ assertNotEquals(DataStorageSpec.inMebibytes(Long.MAX_VALUE),
DataStorageSpec.inGibibytes(Long.MAX_VALUE));
assertEquals(DataStorageSpec.inMebibytes(Long.MAX_VALUE),
DataStorageSpec.inMebibytes(Long.MAX_VALUE));
- assertNotEquals(DataStorageSpec.inMebibytes(Long.MAX_VALUE),
DataStorageSpec.inKibibytes(Long.MAX_VALUE));
assertNotEquals(DataStorageSpec.inMebibytes(Long.MAX_VALUE),
DataStorageSpec.inBytes(Long.MAX_VALUE));
+ assertNotEquals(DataStorageSpec.inMebibytes(Long.MAX_VALUE),
DataStorageSpec.inBytes(Long.MAX_VALUE));
+
+ assertEquals(DataStorageSpec.inGibibytes(Long.MAX_VALUE),
DataStorageSpec.inGibibytes(Long.MAX_VALUE));
+ assertNotEquals(DataStorageSpec.inGibibytes(Long.MAX_VALUE),
DataStorageSpec.inMebibytes(Long.MAX_VALUE));
+ assertNotEquals(DataStorageSpec.inGibibytes(Long.MAX_VALUE),
DataStorageSpec.inKibibytes(Long.MAX_VALUE));
+ assertNotEquals(DataStorageSpec.inGibibytes(Long.MAX_VALUE),
DataStorageSpec.inBytes(Long.MAX_VALUE));
+
assertNotEquals(new DataStorageSpec("0MiB"), new
DataStorageSpec("10KiB"));
}
@@ -138,4 +163,4 @@ public class DataStorageSpecTest
Gen<DataStorageSpec> gen = rs -> new
DataStorageSpec(valueGen.generate(rs), unitGen.generate(rs));
return gen.describedAs(DataStorageSpec::toString);
}
-}
\ No newline at end of file
+}
diff --git
a/test/unit/org/apache/cassandra/db/guardrails/GuardrailCollectionSizeTest.java
b/test/unit/org/apache/cassandra/db/guardrails/GuardrailCollectionSizeTest.java
index 482e37b2ef..1483e8101f 100644
---
a/test/unit/org/apache/cassandra/db/guardrails/GuardrailCollectionSizeTest.java
+++
b/test/unit/org/apache/cassandra/db/guardrails/GuardrailCollectionSizeTest.java
@@ -48,12 +48,12 @@ public class GuardrailCollectionSizeTest extends
ThresholdTester
public GuardrailCollectionSizeTest()
{
- super(WARN_THRESHOLD / 1024, // to KiB
- FAIL_THRESHOLD / 1024, // to KiB
+ super(WARN_THRESHOLD + "B",
+ FAIL_THRESHOLD + "B",
Guardrails.collectionSize,
- Guardrails::setCollectionSizeThresholdInKiB,
- Guardrails::getCollectionSizeWarnThresholdInKiB,
- Guardrails::getCollectionSizeFailThresholdInKiB);
+ Guardrails::setCollectionSizeThreshold,
+ Guardrails::getCollectionSizeWarnThreshold,
+ Guardrails::getCollectionSizeFailThreshold);
}
@After
diff --git
a/test/unit/org/apache/cassandra/db/guardrails/GuardrailDiskUsageTest.java
b/test/unit/org/apache/cassandra/db/guardrails/GuardrailDiskUsageTest.java
new file mode 100644
index 0000000000..14c2c3acea
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/guardrails/GuardrailDiskUsageTest.java
@@ -0,0 +1,617 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.guardrails;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.nio.file.FileStore;
+import java.util.Arrays;
+import java.util.function.Consumer;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Multimap;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DataStorageSpec;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.gms.VersionedValue;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.disk.usage.DiskUsageBroadcaster;
+import org.apache.cassandra.service.disk.usage.DiskUsageMonitor;
+import org.apache.cassandra.service.disk.usage.DiskUsageState;
+import org.apache.cassandra.utils.FBUtilities;
+import org.mockito.Mockito;
+
+import static org.apache.cassandra.service.disk.usage.DiskUsageState.FULL;
+import static
org.apache.cassandra.service.disk.usage.DiskUsageState.NOT_AVAILABLE;
+import static org.apache.cassandra.service.disk.usage.DiskUsageState.SPACIOUS;
+import static org.apache.cassandra.service.disk.usage.DiskUsageState.STUFFED;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests the guardrails for disk usage, {@link Guardrails#localDataDiskUsage}
and {@link Guardrails#replicaDiskUsage}.
+ */
+public class GuardrailDiskUsageTest extends GuardrailTester
+{
+ private static int defaultDataDiskUsagePercentageWarnThreshold;
+ private static int defaultDataDiskUsagePercentageFailThreshold;
+
+ @BeforeClass
+ public static void beforeClass()
+ {
+ defaultDataDiskUsagePercentageWarnThreshold =
Guardrails.instance.getDataDiskUsagePercentageWarnThreshold();
+ defaultDataDiskUsagePercentageFailThreshold =
Guardrails.instance.getDataDiskUsagePercentageFailThreshold();
+
+ Guardrails.instance.setDataDiskUsagePercentageThreshold(-1, -1);
+ }
+
+ @AfterClass
+ public static void afterClass()
+ {
+
Guardrails.instance.setDataDiskUsagePercentageThreshold(defaultDataDiskUsagePercentageWarnThreshold,
+
defaultDataDiskUsagePercentageFailThreshold);
+ }
+
+ @Test
+ public void testConfigValidation()
+ {
+ assertConfigValid(x -> x.setDataDiskUsageMaxDiskSize(null));
+ assertNull(guardrails().getDataDiskUsageMaxDiskSize());
+
+ assertConfigFails(x -> x.setDataDiskUsageMaxDiskSize("0B"), "0 is not
allowed");
+ assertConfigFails(x -> x.setDataDiskUsageMaxDiskSize("0KiB"), "0 is
not allowed");
+ assertConfigFails(x -> x.setDataDiskUsageMaxDiskSize("0MiB"), "0 is
not allowed");
+ assertConfigFails(x -> x.setDataDiskUsageMaxDiskSize("0GiB"), "0 is
not allowed");
+
+ assertConfigValid(x -> x.setDataDiskUsageMaxDiskSize("10B"));
+ assertEquals("10B", guardrails().getDataDiskUsageMaxDiskSize());
+
+ assertConfigValid(x -> x.setDataDiskUsageMaxDiskSize("20KiB"));
+ assertEquals("20KiB", guardrails().getDataDiskUsageMaxDiskSize());
+
+ assertConfigValid(x -> x.setDataDiskUsageMaxDiskSize("30MiB"));
+ assertEquals("30MiB", guardrails().getDataDiskUsageMaxDiskSize());
+
+ assertConfigValid(x -> x.setDataDiskUsageMaxDiskSize("40GiB"));
+ assertEquals("40GiB", guardrails().getDataDiskUsageMaxDiskSize());
+
+ assertConfigFails(x -> x.setDataDiskUsageMaxDiskSize(Long.MAX_VALUE +
"GiB"), "are actually available on disk");
+
+ // warn threshold smaller than lower bound
+ assertConfigFails(x -> x.setDataDiskUsagePercentageThreshold(0, 80),
"0 is not allowed");
+
+ // fail threshold bigger than upper bound
+ assertConfigFails(x -> x.setDataDiskUsagePercentageThreshold(1, 110),
"maximum allowed value is 100");
+
+ // warn threshold larger than fail threshold
+ assertConfigFails(x -> x.setDataDiskUsagePercentageThreshold(60, 50),
+ "The warn threshold 60 for
data_disk_usage_percentage_warn_threshold should be lower than the fail
threshold 50");
+ }
+
+ @Test
+ public void testDiskUsageState()
+ {
+ guardrails().setDataDiskUsagePercentageThreshold(50, 90);
+
+ // under usage
+ assertEquals(SPACIOUS, DiskUsageMonitor.instance.getState(10));
+ assertEquals(SPACIOUS, DiskUsageMonitor.instance.getState(50));
+
+ // exceed warning threshold
+ assertEquals(STUFFED, DiskUsageMonitor.instance.getState(51));
+ assertEquals(STUFFED, DiskUsageMonitor.instance.getState(56));
+ assertEquals(STUFFED, DiskUsageMonitor.instance.getState(90));
+
+ // exceed fail threshold
+ assertEquals(FULL, DiskUsageMonitor.instance.getState(91));
+ assertEquals(FULL, DiskUsageMonitor.instance.getState(100));
+
+ // shouldn't be possible to go over 100% but just to be sure
+ assertEquals(FULL, DiskUsageMonitor.instance.getState(101));
+ assertEquals(FULL, DiskUsageMonitor.instance.getState(500));
+ }
+
+ @Test
+ public void testDiskUsageDetectorWarnDisabled()
+ {
+ guardrails().setDataDiskUsagePercentageThreshold(-1, 90);
+
+ // under usage
+ assertEquals(SPACIOUS, DiskUsageMonitor.instance.getState(0));
+ assertEquals(SPACIOUS, DiskUsageMonitor.instance.getState(50));
+ assertEquals(SPACIOUS, DiskUsageMonitor.instance.getState(90));
+
+ // exceed fail threshold
+ assertEquals(FULL, DiskUsageMonitor.instance.getState(91));
+ assertEquals(FULL, DiskUsageMonitor.instance.getState(100));
+ }
+
+ @Test
+ public void testDiskUsageDetectorFailDisabled()
+ {
+ guardrails().setDataDiskUsagePercentageThreshold(50, -1);
+
+ // under usage
+ assertEquals(SPACIOUS, DiskUsageMonitor.instance.getState(50));
+
+ // exceed warning threshold
+ assertEquals(STUFFED, DiskUsageMonitor.instance.getState(51));
+ assertEquals(STUFFED, DiskUsageMonitor.instance.getState(80));
+ assertEquals(STUFFED, DiskUsageMonitor.instance.getState(100));
+ }
+
+ @Test
+ public void testDiskUsageGuardrailDisabled()
+ {
+ guardrails().setDataDiskUsagePercentageThreshold(-1, -1);
+
+ assertEquals(NOT_AVAILABLE, DiskUsageMonitor.instance.getState(0));
+ assertEquals(NOT_AVAILABLE, DiskUsageMonitor.instance.getState(60));
+ assertEquals(NOT_AVAILABLE, DiskUsageMonitor.instance.getState(100));
+ }
+
+ @Test
+ public void testMemtableSizeIncluded() throws Throwable
+ {
+ DiskUsageMonitor monitor = new DiskUsageMonitor();
+
+ createTable(keyspace(), "CREATE TABLE %s (k text PRIMARY KEY, v text)
WITH compression = { 'enabled': false }");
+
+ long memtableSizeBefore = monitor.getAllMemtableSize();
+ int rows = 10;
+ int mb = 1024 * 1024;
+
+ for (int i = 0; i < rows; i++)
+ {
+ char[] chars = new char[mb];
+ Arrays.fill(chars, (char) i);
+ String value = String.copyValueOf(chars);
+ execute("INSERT INTO %s (k, v) VALUES(?, ?)", i, value);
+ }
+
+ // verify memtables are included
+ long memtableSizeAfterInsert = monitor.getAllMemtableSize();
+ assertTrue(String.format("Expect at least 10MB more data, but got
before: %s and after: %d",
+ memtableSizeBefore, memtableSizeAfterInsert),
+ memtableSizeAfterInsert - memtableSizeBefore >= rows * mb);
+
+ // verify memtable size are reduced after flush
+ flush();
+ long memtableSizeAfterFlush = monitor.getAllMemtableSize();
+ assertEquals(memtableSizeBefore, memtableSizeAfterFlush, mb);
+ }
+
+ @Test
+ public void testMonitorLogsOnStateChange()
+ {
+ guardrails().setDataDiskUsagePercentageThreshold(50, 90);
+
+ Guardrails.localDataDiskUsage.resetLastNotifyTime();
+
+ DiskUsageMonitor monitor = new DiskUsageMonitor();
+
+ // transit to SPACIOUS, no logging
+ assertMonitorStateTransition(0.50, SPACIOUS, monitor);
+
+ // transit to STUFFED, expect warning
+ assertMonitorStateTransition(0.50001, STUFFED, monitor, true, "Local
data disk usage 51%(Stuffed) exceeds warning threshold of 50%");
+
+ // remain as STUFFED, no logging because of min log interval
+ assertMonitorStateTransition(0.90, STUFFED, monitor);
+
+ // transit to FULL, expect failure
+ assertMonitorStateTransition(0.90001, FULL, monitor, false, "Local
data disk usage 91%(Full) exceeds failure threshold of 90%, will stop accepting
writes");
+
+ // remain as FULL, no logging because of min log interval
+ assertMonitorStateTransition(0.99, FULL, monitor);
+
+ // remain as FULL, no logging because of min log interval
+ assertMonitorStateTransition(5.0, FULL, monitor);
+
+ // transit back to STUFFED, no warning because of min log interval
+ assertMonitorStateTransition(0.90, STUFFED, monitor);
+
+ // transit back to FULL, no logging because of min log interval
+ assertMonitorStateTransition(0.900001, FULL, monitor);
+
+ // transit back to STUFFED, no logging because of min log interval
+ assertMonitorStateTransition(0.90, STUFFED, monitor);
+
+ // transit to SPACIOUS, no logging
+ assertMonitorStateTransition(0.50, SPACIOUS, monitor);
+ }
+
+ @Test
+ public void testDiskUsageBroadcaster() throws UnknownHostException
+ {
+ DiskUsageBroadcaster broadcaster = new DiskUsageBroadcaster(null);
+ Gossiper.instance.unregister(broadcaster);
+
+ InetAddressAndPort node1 = InetAddressAndPort.getByName("127.0.0.1");
+ InetAddressAndPort node2 = InetAddressAndPort.getByName("127.0.0.2");
+ InetAddressAndPort node3 = InetAddressAndPort.getByName("127.0.0.3");
+
+ // initially it's NOT_AVAILABLE
+ assertFalse(broadcaster.hasStuffedOrFullNode());
+ assertFalse(broadcaster.isFull(node1));
+ assertFalse(broadcaster.isFull(node2));
+ assertFalse(broadcaster.isFull(node3));
+
+ // adding 1st node: Spacious, cluster has no Full node
+ broadcaster.onChange(node1, ApplicationState.DISK_USAGE,
value(SPACIOUS));
+ assertFalse(broadcaster.hasStuffedOrFullNode());
+ assertFalse(broadcaster.isFull(node1));
+
+ // adding 2nd node with wrong ApplicationState
+ broadcaster.onChange(node2, ApplicationState.RACK, value(FULL));
+ assertFalse(broadcaster.hasStuffedOrFullNode());
+ assertFalse(broadcaster.isFull(node2));
+
+ // adding 2nd node: STUFFED
+ broadcaster.onChange(node2, ApplicationState.DISK_USAGE,
value(STUFFED));
+ assertTrue(broadcaster.hasStuffedOrFullNode());
+ assertTrue(broadcaster.isStuffed(node2));
+
+ // adding 3rd node: FULL
+ broadcaster.onChange(node3, ApplicationState.DISK_USAGE, value(FULL));
+ assertTrue(broadcaster.hasStuffedOrFullNode());
+ assertTrue(broadcaster.isFull(node3));
+
+ // remove 2nd node, cluster has Full node
+ broadcaster.onRemove(node2);
+ assertTrue(broadcaster.hasStuffedOrFullNode());
+ assertFalse(broadcaster.isStuffed(node2));
+
+ // remove 3nd node, cluster has no Full node
+ broadcaster.onRemove(node3);
+ assertFalse(broadcaster.hasStuffedOrFullNode());
+ assertFalse(broadcaster.isFull(node3));
+ }
+
+ @Test
+ public void testDiskUsageCalculationWithMaxDiskSize() throws IOException
+ {
+ Directories.DataDirectory directory =
mock(Directories.DataDirectory.class);
+
when(directory.getRawSize()).thenReturn(DataStorageSpec.inGibibytes(5).toBytes());
+
+ FileStore store = mock(FileStore.class);
+
when(store.getUsableSpace()).thenReturn(DataStorageSpec.inGibibytes(100 -
5).toBytes()); // 100GiB disk
+
+ Multimap<FileStore, Directories.DataDirectory> directories =
HashMultimap.create();
+ directories.put(store, directory);
+ DiskUsageMonitor monitor = spy(new DiskUsageMonitor(() ->
directories));
+
+ doCallRealMethod().when(monitor).getDiskUsage();
+ doReturn(0L).when(monitor).getAllMemtableSize();
+
+ guardrails().setDataDiskUsageMaxDiskSize(null);
+ assertThat(monitor.getDiskUsage()).isEqualTo(0.05);
+
+ // 5G are used of 10G
+ guardrails().setDataDiskUsageMaxDiskSize("10GiB");
+ assertThat(monitor.getDiskUsage()).isEqualTo(0.5);
+
+ // max disk size = space used
+ guardrails().setDataDiskUsageMaxDiskSize("5GiB");
+ assertThat(monitor.getDiskUsage()).isEqualTo(1.0);
+
+ // max disk size < space used
+ guardrails().setDataDiskUsageMaxDiskSize("1GiB");
+ assertThat(monitor.getDiskUsage()).isEqualTo(5.0);
+ }
+
+ @Test
+ public void testDiskUsageCalculationWithMaxDiskSizeAndSmallUnits() throws
IOException
+ {
+ // 5GiB used out of 100GiB disk
+ long freeDiskSizeInBytes = DataStorageSpec.inGibibytes(100).toBytes()
- DataStorageSpec.inMebibytes(5).toBytes();
+
+ FileStore store = mock(FileStore.class);
+
when(store.getUsableSpace()).thenReturn(DataStorageSpec.inBytes(freeDiskSizeInBytes).toBytes());
// 100GiB disk
+
+ Directories.DataDirectory directory =
mock(Directories.DataDirectory.class);
+
when(directory.getRawSize()).thenReturn(DataStorageSpec.inMebibytes(5).toBytes());
+
+ Multimap<FileStore, Directories.DataDirectory> directories =
HashMultimap.create();
+ directories.put(store, directory);
+ DiskUsageMonitor monitor = spy(new DiskUsageMonitor(() ->
directories));
+
+ doCallRealMethod().when(monitor).getDiskUsage();
+ doReturn(0L).when(monitor).getAllMemtableSize();
+
+ guardrails().setDataDiskUsageMaxDiskSize(null);
+ assertThat(monitor.getDiskUsage()).isEqualTo(0.00005);
+
+ // 5MiB are used of 10MiB
+ guardrails().setDataDiskUsageMaxDiskSize("10MiB");
+ assertThat(monitor.getDiskUsage()).isEqualTo(0.5);
+
+ // max disk size = space used
+ guardrails().setDataDiskUsageMaxDiskSize("5MiB");
+ assertThat(monitor.getDiskUsage()).isEqualTo(1.0);
+
+ // max disk size < space used
+ guardrails().setDataDiskUsageMaxDiskSize("1MiB");
+ assertThat(monitor.getDiskUsage()).isEqualTo(5.0);
+ }
+
+ @Test
+ public void testDiskUsageCalculationWithMaxDiskSizeAndMultipleVolumes()
throws IOException
+ {
+ Mockito.reset();
+
+ Multimap<FileStore, Directories.DataDirectory> directories =
HashMultimap.create();
+
+ Directories.DataDirectory directory1 =
mock(Directories.DataDirectory.class);
+ FileStore store1 = mock(FileStore.class);
+
when(directory1.getRawSize()).thenReturn(DataStorageSpec.inGibibytes(5).toBytes());
+
when(store1.getUsableSpace()).thenReturn(DataStorageSpec.inGibibytes(100 -
5).toBytes()); // 100 GiB disk
+ directories.put(store1, directory1);
+
+ Directories.DataDirectory directory2 =
mock(Directories.DataDirectory.class);
+ FileStore store2 = mock(FileStore.class);
+
when(directory2.getRawSize()).thenReturn(DataStorageSpec.inGibibytes(25).toBytes());
+
when(store2.getUsableSpace()).thenReturn(DataStorageSpec.inGibibytes(100 -
25).toBytes()); // 100 GiB disk
+ directories.put(store2, directory2);
+
+ Directories.DataDirectory directory3 =
mock(Directories.DataDirectory.class);
+ FileStore store3 = mock(FileStore.class);
+
when(directory3.getRawSize()).thenReturn(DataStorageSpec.inGibibytes(20).toBytes());
+
when(store3.getUsableSpace()).thenReturn(DataStorageSpec.inGibibytes(100 -
20).toBytes()); // 100 GiB disk
+ directories.put(store3, directory3);
+
+ DiskUsageMonitor monitor = spy(new DiskUsageMonitor(() ->
directories));
+
+ doCallRealMethod().when(monitor).getDiskUsage();
+ doReturn(0L).when(monitor).getAllMemtableSize();
+
+ // 50G/300G as each disk has a capacity of 100G
+ guardrails().setDataDiskUsageMaxDiskSize(null);
+ assertThat(monitor.getDiskUsage()).isEqualTo(0.16667);
+
+ // 50G/100G
+ guardrails().setDataDiskUsageMaxDiskSize("100GiB");
+ assertThat(monitor.getDiskUsage()).isEqualTo(0.5);
+
+ // 50G/75G
+ guardrails().setDataDiskUsageMaxDiskSize("75GiB");
+ assertThat(monitor.getDiskUsage()).isEqualTo(0.66667);
+
+ // 50G/50G
+ guardrails().setDataDiskUsageMaxDiskSize("50GiB");
+ assertThat(monitor.getDiskUsage()).isEqualTo(1.0);
+
+ // 50G/49G
+ guardrails().setDataDiskUsageMaxDiskSize("49GiB");
+ assertThat(monitor.getDiskUsage()).isEqualTo(1.02041);
+ }
+
+ @Test
+ public void
testDiskUsageCalculationWithMaxDiskSizeAndMultipleDirectories() throws
IOException
+ {
+ Mockito.reset();
+
+ Directories.DataDirectory directory1 =
mock(Directories.DataDirectory.class);
+
when(directory1.getRawSize()).thenReturn(DataStorageSpec.inGibibytes(5).toBytes());
+
+ Directories.DataDirectory directory2 =
mock(Directories.DataDirectory.class);
+
when(directory2.getRawSize()).thenReturn(DataStorageSpec.inGibibytes(25).toBytes());
+
+ Directories.DataDirectory directory3 =
mock(Directories.DataDirectory.class);
+
when(directory3.getRawSize()).thenReturn(DataStorageSpec.inGibibytes(20).toBytes());
+
+ FileStore store = mock(FileStore.class);
+
when(store.getUsableSpace()).thenReturn(DataStorageSpec.inGibibytes(300 - 5 -
25 - 20).toBytes()); // 100 GiB disk
+
+ Multimap<FileStore, Directories.DataDirectory> directories =
HashMultimap.create();
+ directories.putAll(store, ImmutableSet.of(directory1, directory2,
directory3));
+
+ DiskUsageMonitor monitor = spy(new DiskUsageMonitor(() ->
directories));
+
+ doCallRealMethod().when(monitor).getDiskUsage();
+ doReturn(0L).when(monitor).getAllMemtableSize();
+
+ // 50G/300G as disk has a capacity of 300G
+ guardrails().setDataDiskUsageMaxDiskSize(null);
+ assertThat(monitor.getDiskUsage()).isEqualTo(0.16667);
+
+ // 50G/100G
+ guardrails().setDataDiskUsageMaxDiskSize("100GiB");
+ assertThat(monitor.getDiskUsage()).isEqualTo(0.5);
+
+ // 50G/75G
+ guardrails().setDataDiskUsageMaxDiskSize("75GiB");
+ assertThat(monitor.getDiskUsage()).isEqualTo(0.66667);
+
+ // 50G/50G
+ guardrails().setDataDiskUsageMaxDiskSize("50GiB");
+ assertThat(monitor.getDiskUsage()).isEqualTo(1.0);
+
+ // 50G/49G
+ guardrails().setDataDiskUsageMaxDiskSize("49GiB");
+ assertThat(monitor.getDiskUsage()).isEqualTo(1.02041);
+ }
+
+ @Test
+ public void testWriteRequests() throws Throwable
+ {
+ createTable("CREATE TABLE %s (k int PRIMARY KEY, v int)");
+
+ InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort();
+ InetAddressAndPort node1 = InetAddressAndPort.getByName("127.0.0.11");
+ InetAddressAndPort node2 = InetAddressAndPort.getByName("127.0.0.21");
+ InetAddressAndPort node3 = InetAddressAndPort.getByName("127.0.0.31");
+
+ Guardrails.replicaDiskUsage.resetLastNotifyTime();
+ guardrails().setDataDiskUsagePercentageThreshold(98, 99);
+
+ ConsistencyLevel cl = ConsistencyLevel.LOCAL_QUORUM;
+ String select = "SELECT * FROM %s";
+ String insert = "INSERT INTO %s (k, v) VALUES (0, 0)";
+ String batch = "BEGIN BATCH " +
+ "INSERT INTO %s (k, v) VALUES (1, 1);" +
+ "INSERT INTO %<s (k, v) VALUES (2, 2); " +
+ "APPLY BATCH";
+ CheckedFunction userSelect = () -> execute(userClientState, select,
cl);
+ CheckedFunction userInsert = () -> execute(userClientState, insert,
cl);
+ CheckedFunction userBatch = () -> execute(userClientState, batch, cl);
+
+ // default state, write request works fine
+ assertValid(userSelect);
+ assertValid(userInsert);
+ assertValid(userBatch);
+
+ // verify node1 NOT_AVAILABLE won't affect writes
+ setDiskUsageState(node1, NOT_AVAILABLE);
+ assertValid(userSelect);
+ assertValid(userInsert);
+ assertValid(userBatch);
+
+ // verify node2 Spacious won't affect writes
+ setDiskUsageState(node2, SPACIOUS);
+ assertValid(userSelect);
+ assertValid(userInsert);
+ assertValid(userBatch);
+
+ // verify node3 STUFFED won't trigger warning as it's not write replica
+ setDiskUsageState(node3, STUFFED);
+ assertValid(userSelect);
+ assertValid(userInsert);
+ assertValid(userBatch);
+
+ // verify node3 Full won't affect writes as it's not write replica
+ setDiskUsageState(node3, FULL);
+ assertValid(userSelect);
+ assertValid(userInsert);
+ assertValid(userBatch);
+
+ // verify local node STUFF, will log warning
+ setDiskUsageState(local, STUFFED);
+ assertValid(userSelect);
+ assertWarns(userInsert);
+ assertWarns(userBatch);
+
+ // verify local node Full, will reject writes
+ setDiskUsageState(local, FULL);
+ assertValid(userSelect);
+ assertFails(userInsert);
+ assertFails(userBatch);
+
+ // excluded users can write to FULL cluster
+ useSuperUser();
+ Guardrails.replicaDiskUsage.resetLastNotifyTime();
+ for (ClientState excludedUser : Arrays.asList(systemClientState,
superClientState))
+ {
+ assertValid(() -> execute(excludedUser, select, cl));
+ assertValid(() -> execute(excludedUser, insert, cl));
+ assertValid(() -> execute(excludedUser, batch, cl));
+ }
+
+ // verify local node STUFFED won't reject writes
+ setDiskUsageState(local, STUFFED);
+ assertValid(userSelect);
+ assertWarns(userInsert);
+ assertWarns(userBatch);
+ }
+
+ @Override
+ protected void assertValid(CheckedFunction function) throws Throwable
+ {
+ Guardrails.replicaDiskUsage.resetLastNotifyTime();
+ super.assertValid(function);
+ }
+
+ protected void assertWarns(CheckedFunction function) throws Throwable
+ {
+ Guardrails.replicaDiskUsage.resetLastNotifyTime();
+ super.assertWarns(function, "Replica disk usage exceeds warning
threshold");
+ }
+
+ protected void assertFails(CheckedFunction function) throws Throwable
+ {
+ Guardrails.replicaDiskUsage.resetLastNotifyTime();
+ super.assertFails(function, "Write request failed because disk usage
exceeds failure threshold");
+ }
+
+ private static void setDiskUsageState(InetAddressAndPort endpoint,
DiskUsageState state)
+ {
+ DiskUsageBroadcaster.instance.onChange(endpoint,
ApplicationState.DISK_USAGE, value(state));
+ }
+
+ private static VersionedValue value(DiskUsageState state)
+ {
+ return StorageService.instance.valueFactory.diskUsage(state.name());
+ }
+
+ private void assertMonitorStateTransition(double usageRatio,
DiskUsageState state, DiskUsageMonitor monitor)
+ {
+ assertMonitorStateTransition(usageRatio, state, monitor, false, null);
+ }
+
+ private void assertMonitorStateTransition(double usageRatio,
DiskUsageState state, DiskUsageMonitor monitor,
+ boolean isWarn, String msg)
+ {
+ boolean stateChanged = state != monitor.state();
+ Consumer<DiskUsageState> notifier = newState -> {
+ if (stateChanged)
+ assertEquals(state, newState);
+ else
+ fail("Expect no notification if state remains the same");
+ };
+
+ monitor.updateLocalState(usageRatio, notifier);
+ assertEquals(state, monitor.state());
+
+ if (msg == null)
+ {
+ listener.assertNotFailed();
+ listener.assertNotWarned();
+ }
+ else if (isWarn)
+ {
+ listener.assertWarned(msg);
+ listener.assertNotFailed();
+ }
+ else
+ {
+ listener.assertFailed(msg);
+ listener.assertNotWarned();
+ }
+
+ listener.clear();
+ }
+}
diff --git a/test/unit/org/apache/cassandra/db/guardrails/GuardrailTester.java
b/test/unit/org/apache/cassandra/db/guardrails/GuardrailTester.java
index 74747bbe5a..986e1d698f 100644
--- a/test/unit/org/apache/cassandra/db/guardrails/GuardrailTester.java
+++ b/test/unit/org/apache/cassandra/db/guardrails/GuardrailTester.java
@@ -427,6 +427,11 @@ public abstract class GuardrailTester extends CQLTester
.collect(Collectors.toList());
}
+ protected void assertConfigValid(Consumer<Guardrails> consumer)
+ {
+ consumer.accept(guardrails());
+ }
+
protected void assertConfigFails(Consumer<Guardrails> consumer, String
message)
{
try
@@ -533,6 +538,11 @@ public abstract class GuardrailTester extends CQLTester
assertTrue(format("Expect no warning diagnostic events but got
%s", warnings), warnings.isEmpty());
}
+ public void assertWarned(String message)
+ {
+ assertWarned(Collections.singletonList(message));
+ }
+
public void assertWarned(List<String> messages)
{
assertFalse("Expected to emit warning diagnostic event, but no
warning was emitted", warnings.isEmpty());
diff --git a/test/unit/org/apache/cassandra/db/guardrails/GuardrailsTest.java
b/test/unit/org/apache/cassandra/db/guardrails/GuardrailsTest.java
index b98659e9a5..b278e7f6f8 100644
--- a/test/unit/org/apache/cassandra/db/guardrails/GuardrailsTest.java
+++ b/test/unit/org/apache/cassandra/db/guardrails/GuardrailsTest.java
@@ -293,6 +293,52 @@ public class GuardrailsTest extends GuardrailTester
assertValid(() -> disallowed.guard(set(4), action, superClientState));
}
+ @Test
+ public void testPredicates() throws Throwable
+ {
+ Predicates<Integer> guard = new Predicates<>("x",
+ state -> x -> x > 10,
+ state -> x -> x > 100,
+ (isWarn, value) ->
format("%s: %s", isWarn ? "Warning" : "Aborting", value));
+
+ assertValid(() -> guard.guard(5, userClientState));
+ assertWarns(() -> guard.guard(25, userClientState), "Warning: 25");
+ assertWarns(() -> guard.guard(100, userClientState), "Warning: 100");
+ assertFails(() -> guard.guard(101, userClientState), "Aborting: 101");
+ assertFails(() -> guard.guard(200, userClientState), "Aborting: 200");
+ assertValid(() -> guard.guard(5, userClientState));
+ }
+
+ @Test
+ public void testPredicatesUsers() throws Throwable
+ {
+ Predicates<Integer> guard = new Predicates<>("x",
+ state -> x -> x > 10,
+ state -> x -> x > 100,
+ (isWarn, value) ->
format("%s: %s", isWarn ? "Warning" : "Aborting", value));
+
+ assertTrue(guard.enabled());
+ assertTrue(guard.enabled(null));
+ assertTrue(guard.enabled(userClientState));
+ assertFalse(guard.enabled(systemClientState));
+ assertFalse(guard.enabled(superClientState));
+
+ assertValid(() -> guard.guard(5, null));
+ assertValid(() -> guard.guard(5, userClientState));
+ assertValid(() -> guard.guard(5, systemClientState));
+ assertValid(() -> guard.guard(5, superClientState));
+
+ assertWarns(() -> guard.guard(25, null), "Warning: 25");
+ assertWarns(() -> guard.guard(25, userClientState), "Warning: 25");
+ assertValid(() -> guard.guard(25, systemClientState));
+ assertValid(() -> guard.guard(25, superClientState));
+
+ assertFails(() -> guard.guard(101, null), false, "Aborting: 101");
+ assertFails(() -> guard.guard(101, userClientState), "Aborting: 101");
+ assertValid(() -> guard.guard(101, systemClientState));
+ assertValid(() -> guard.guard(101, superClientState));
+ }
+
private static Set<Integer> set(Integer value)
{
return Collections.singleton(value);
diff --git a/test/unit/org/apache/cassandra/db/guardrails/ThresholdTester.java
b/test/unit/org/apache/cassandra/db/guardrails/ThresholdTester.java
index 885f626881..a04cc9933b 100644
--- a/test/unit/org/apache/cassandra/db/guardrails/ThresholdTester.java
+++ b/test/unit/org/apache/cassandra/db/guardrails/ThresholdTester.java
@@ -21,13 +21,14 @@ package org.apache.cassandra.db.guardrails;
import java.util.Collections;
import java.util.List;
import java.util.function.BiConsumer;
+import java.util.function.Function;
import java.util.function.ToIntFunction;
import java.util.function.ToLongFunction;
import org.junit.Before;
import org.junit.Test;
-import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DataStorageSpec;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.assertj.core.api.Assertions;
@@ -47,7 +48,7 @@ public abstract class ThresholdTester extends GuardrailTester
private final ToLongFunction<Guardrails> warnGetter;
private final ToLongFunction<Guardrails> failGetter;
private final long maxValue;
- private final long disabledValue;
+ private final Long disabledValue;
protected ThresholdTester(int warnThreshold,
int failThreshold,
@@ -63,7 +64,7 @@ public abstract class ThresholdTester extends GuardrailTester
this.warnGetter = g -> (long) warnGetter.applyAsInt(g);
this.failGetter = g -> (long) failGetter.applyAsInt(g);
maxValue = Integer.MAX_VALUE;
- disabledValue = Config.DISABLED_GUARDRAIL;
+ disabledValue = -1L;
}
protected ThresholdTester(long warnThreshold,
@@ -80,7 +81,24 @@ public abstract class ThresholdTester extends GuardrailTester
this.warnGetter = warnGetter;
this.failGetter = failGetter;
maxValue = Long.MAX_VALUE;
- disabledValue = Config.DISABLED_SIZE_GUARDRAIL.toBytes();
+ disabledValue = -1L;
+ }
+
+ protected ThresholdTester(String warnThreshold,
+ String failThreshold,
+ Threshold threshold,
+ TriConsumer<Guardrails, String, String> setter,
+ Function<Guardrails, String> warnGetter,
+ Function<Guardrails, String> failGetter)
+ {
+ super(threshold);
+ this.warnThreshold = new DataStorageSpec(warnThreshold).toBytes();
+ this.failThreshold = new DataStorageSpec(failThreshold).toBytes();
+ this.setter = (g, w, a) -> setter.accept(g, w == null ? null :
DataStorageSpec.inBytes(w).toString(), a == null ? null :
DataStorageSpec.inBytes(a).toString());
+ this.warnGetter = g -> new
DataStorageSpec(warnGetter.apply(g)).toBytes();
+ this.failGetter = g -> new
DataStorageSpec(failGetter.apply(g)).toBytes();
+ maxValue = Long.MAX_VALUE;
+ disabledValue = null;
}
protected long currentValue()
@@ -225,7 +243,7 @@ public abstract class ThresholdTester extends
GuardrailTester
assertInvalidStrictlyPositiveProperty(setter, Integer.MIN_VALUE, name);
assertInvalidStrictlyPositiveProperty(setter, -2, name);
assertValidProperty(setter, disabledValue);
- assertInvalidStrictlyPositiveProperty(setter, disabledValue == 0 ? -1
: 0, name);
+ assertInvalidStrictlyPositiveProperty(setter, disabledValue == null ?
-1 : 0, name);
assertValidProperty(setter, 1L);
assertValidProperty(setter, 2L);
assertValidProperty(setter, maxValue);
diff --git a/test/unit/org/apache/cassandra/db/virtual/GossipInfoTableTest.java
b/test/unit/org/apache/cassandra/db/virtual/GossipInfoTableTest.java
index a2bce7d3d6..3f859dcf99 100644
--- a/test/unit/org/apache/cassandra/db/virtual/GossipInfoTableTest.java
+++ b/test/unit/org/apache/cassandra/db/virtual/GossipInfoTableTest.java
@@ -80,7 +80,7 @@ public class GossipInfoTableTest extends CQLTester
assertThat(entry).isNotEmpty();
UntypedResultSet.Row row = resultSet.one();
- assertThat(row.getColumns().size()).isEqualTo(62);
+ assertThat(row.getColumns().size()).isEqualTo(64);
InetAddressAndPort endpoint = entry.get().getKey();
EndpointState localState = entry.get().getValue();
@@ -109,6 +109,7 @@ public class GossipInfoTableTest extends CQLTester
assertValue(row, "native_address_and_port", localState,
ApplicationState.NATIVE_ADDRESS_AND_PORT);
assertValue(row, "status_with_port", localState,
ApplicationState.STATUS_WITH_PORT);
assertValue(row, "sstable_versions", localState,
ApplicationState.SSTABLE_VERSIONS);
+ assertValue(row, "disk_usage", localState,
ApplicationState.DISK_USAGE);
assertValue(row, "x_11_padding", localState,
ApplicationState.X_11_PADDING);
assertValue(row, "x1", localState, ApplicationState.X1);
assertValue(row, "x2", localState, ApplicationState.X2);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]