This is an automated email from the ASF dual-hosted git repository. jmckenzie pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new ab920c3 Add a Denylist to block reads and writes on specific partition keys ab920c3 is described below commit ab920c30310a8c095ba76b363142b8e74cbf0a0a Author: Josh McKenzie <jmcken...@apache.org> AuthorDate: Fri Sep 17 16:34:04 2021 -0400 Add a Denylist to block reads and writes on specific partition keys Patch by Josh McKenzie, reviewed by Aleksei Zotov and Sumanth Pasupuleti for CASSANDRA-12106 Co-authored by Josh McKenzie <jmcken...@apache.org> Co-authored by Sam Overton --- conf/cassandra.yaml | 30 ++ doc/source/operating/denylisting_partitions.rst | 110 +++++ doc/source/operating/index.rst | 1 + src/java/org/apache/cassandra/config/Config.java | 32 ++ .../cassandra/config/DatabaseDescriptor.java | 98 ++++ .../org/apache/cassandra/db/view/ViewBuilder.java | 2 +- .../org/apache/cassandra/db/view/ViewManager.java | 2 +- .../apache/cassandra/metrics/DenylistMetrics.java | 58 +++ .../org/apache/cassandra/repair/RepairJob.java | 1 + .../apache/cassandra/repair/RepairRunnable.java | 20 +- .../org/apache/cassandra/repair/RepairSession.java | 1 + .../apache/cassandra/schema/PartitionDenylist.java | 535 +++++++++++++++++++++ .../apache/cassandra/schema/SchemaConstants.java | 3 + .../SystemDistributedKeyspace.java | 33 +- .../org/apache/cassandra/service/StorageProxy.java | 185 ++++++- .../cassandra/service/StorageProxyMBean.java | 14 + .../apache/cassandra/service/StorageService.java | 3 + .../service/reads/range/RangeCommands.java | 26 + .../distributed/test/GossipSettlesTest.java | 2 +- .../distributed/test/PartitionDenylistTest.java | 155 ++++++ .../distributed/test/metric/TableMetricTest.java | 2 +- .../config/DatabaseDescriptorRefTest.java | 3 + .../cassandra/config/DatabaseDescriptorTest.java | 22 + .../cassandra/service/PartitionDenylistTest.java | 495 +++++++++++++++++++ 24 files changed, 1800 insertions(+), 33 deletions(-) diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index 87df25a..65eb385 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -1026,6 +1026,36 @@ slow_query_log_timeout_in_ms: 500 # bound (for example a few nodes with big files). # streaming_connections_per_host: 1 +# Allows denying configurable access (rw/rr) to operations on configured ks, table, and partitions, intended for use by +# operators to manage cluster health vs application access. See CASSANDRA-12106 and CEP-13 for more details. +# enable_partition_denylist = false; + +# enable_denylist_writes = true; +# enable_denylist_reads = true; +# enable_denylist_range_reads = true; + +# The interval at which keys in the cache for denylisting will "expire" and async refresh from the backing DB. +# Note: this serves only as a fail-safe, as the usage pattern is expected to be "mutate state, refresh cache" on any +# changes to the underlying denylist entries. See documentation for details. +# denylist_refresh_seconds = 600; + +# In the event of errors on attempting to load the denylist cache, retry on this interval. +# denylist_initial_load_retry_seconds = 5; + +# We cap the number of denylisted keys allowed per table to keep things from growing unbounded. Nodes will warn above +# this limit while allowing new denylisted keys to be inserted. Denied keys are loaded in natural query / clustering +# ordering by partition key in case of overflow. +# denylist_max_keys_per_table = 1000; + +# We cap the total number of denylisted keys allowed in the cluster to keep things from growing unbounded. +# Nodes will warn on initial cache load that there are too many keys and be direct the operator to trim down excess +# entries to within the configured limits. +# denylist_max_keys_total = 10000; + +# Since the denylist in many ways serves to protect the health of the cluster from partitions operators have identified +# as being in a bad state, we usually want more robustness than just CL.ONE on operations to/from these tables to +# ensure that these safeguards are in place. That said, we allow users to configure this if they're so inclined. +# denylist_consistency_level = ConsistencyLevel.QUORUM; # phi value that must be reached for a host to be marked down. # most users should never need to adjust this. diff --git a/doc/source/operating/denylisting_partitions.rst b/doc/source/operating/denylisting_partitions.rst new file mode 100644 index 0000000..3e70f2d --- /dev/null +++ b/doc/source/operating/denylisting_partitions.rst @@ -0,0 +1,110 @@ +.. Licensed to the Apache Software Foundation (ASF) under one +.. or more contributor license agreements. See the NOTICE file +.. distributed with this work for additional information +.. regarding copyright ownership. The ASF licenses this file +.. to you under the Apache License, Version 2.0 (the +.. "License"); you may not use this file except in compliance +.. with the License. You may obtain a copy of the License at +.. +.. http://www.apache.org/licenses/LICENSE-2.0 +.. +.. Unless required by applicable law or agreed to in writing, software +.. distributed under the License is distributed on an "AS IS" BASIS, +.. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +.. See the License for the specific language governing permissions and +.. limitations under the License. + +Denylisting Partitions +---------------------- + +Due to access patterns and data modeling, sometimes there are specific partitions that are "hot" and can cause instability in a Cassandra cluster. This often occurs when your data model includes many update or insert operations on a single partition, causing the partition to grow very large over time and in turn making it very expensive to read and maintain. + +Cassandra supports "denylisting" these problematic partitions so that when clients issue point reads (`SELECT` statements with the partition key specified) or range reads (`SELECT *`, etc that pull a range of data) that intersect with a blocked partition key, the query will be immediately rejected with an `InvalidQueryException`. + +How to denylist a partition key +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +The ``system_distributed.denylisted_partitions`` table can be used to denylist partitions. There are a couple of ways to interact with and mutate this data. First: directly via CQL by inserting a record with the following details: + +- Keyspace name (ks_name) +- Table name (table_name) +- Partition Key (partition_key) + +The partition key format needs to be in the same form required by ``nodetool getendpoints``. + +Following are several examples for denylisting partition keys in keyspace `ks` and table `table1` for different data types on the primary key `Id`: + + - Id is a simple type - INSERT INTO system_distributed.denylisted_partitions (ks_name, table_name, partition_key) VALUES ('ks','table1','1'); + - Id is a blob - INSERT INTO system_distributed.denylisted_partitions (ks_name, table_name, partition_key) VALUES ('ks','table1','12345f'); + - Id has a colon - INSERT INTO system_distributed.denylisted_partitions (ks_name, table_name, partition_key) VALUES ('ks','table1','1\:2'); + +In the case of composite column partition keys (Key1, Key2): + + - INSERT INTO system_distributed.denylisted_partitions (ks_name, table_name, partition_key) VALUES ('ks', 'table1', 'k11:k21') + +Special considerations +^^^^^^^^^^^^^^^^^^^^^^ +The denylist has the property in that you want to keep your cache (see below) and CQL data on a replica set as close together as possible so you don't have different nodes in your cluster denying or allowing different keys. To best achieve this, the workflow for a denylist change (addition or deletion) should `always be as follows`: + +JMX PATH (preferred for single changes): + +1. Call the JMX hook for ``denylistKey()`` with the desired key +2. Double check the cache reloaded with ``isKeyDenylisted()`` +3. Check for warnings about unrecognized keyspace/table combinations, limits, or consistency level. If you get a message about nodes being down and not hitting CL for denylist, recover the downed nodes and then trigger a reload of the cache on each node with ``loadPartitionDenylist()`` + +CQL PATH (preferred for bulk changes): + +1. Mutate the denylisted partition lists via CQL +2. Trigger a reload of the denylist cache on each node via JMX ``loadPartitionDenylist()`` (see below) +3. Check for warnings about lack of availability for a denylist refresh. In the event nodes are down, recover them, then go to 2. + +Due to conditions on known unavailable range slices leading to alert storming on startup, the denylist cache won't load on node start unless it can achieve the configured consistency level in cassandra.yaml, `denylist_consistency_level`. The JMX call to `loadPartitionDenylist` will, however, load the cache regardless of the number of nodes available. This leaves the control for denylisting or not denylisting during degraded cluster states in the hands of the operator. + +Denylisted Partitions Cache +^^^^^^^^^^^^^^^^^^^^^^^^^^^ +Cassandra internally maintains an on-heap cache of denylisted partitions loaded from ``system_distributed.denylisted_partitions``. The values for a table will be automatically repopulated every ``denylist_refresh_seconds`` as specified in the `conf/cassandra.yaml` file, defaulting to 600 seconds, or 10 minutes. Invalid records (unknown keyspaces, tables, or keys) will be ignored and not cached on load. + +The cache can be refreshed in the following ways: + +- During Cassandra node startup +- Via the automatic on-heap cache refresh mechanisms. Note: this will occur asynchronously on query after the ``denylist_refresh_seconds`` time is hit. +- Via the JMX command: ``loadPartitionDenylist`` in ``the org.apache.cassandra.service.StorageProxyMBean`` invocation point. + +The Cache size is bounded by the following two config properties + +- denylist_max_keys_per_table +- denylist_max_keys_total + +On cache load, if a table exceeds the value allowed in `denylist_max_keys_per_table`, a warning will be printed to the logs and the remainder of the keys will not be cached. Similarly, if the total allowable size is exceeded subsequent ks_name + table_name combinations (in clustering / lexicographical order) will be skipped as well, and a warning logged to the server logs. + +Note: given the required workflow of 1) Mutate, 2) Reload cache, the auto-reload property seems superfluous. It exists to ensure that, should an operator make a mistake and denylist (or undenylist) a key but forget to reload the cache, that intent will be captured on the next cache reload. + +JMX Interface +^^^^^^^^^^^^^ + ++----------------------------------------------------------------------------+---------------------------------------------------------------------------------+ +| Command | Effect | ++============================================================================+=================================================================================+ +| loadPartitionDenylist() | Reloads cached denylist from CQL table | ++----------------------------------------------------------------------------+---------------------------------------------------------------------------------+ +| getPartitionDenylistLoadAttempts() | Gets the count of cache reload attempts | ++----------------------------------------------------------------------------+---------------------------------------------------------------------------------+ +| getPartitionDenylistLoadSuccesses() | Gets the count of cache reload successes | ++----------------------------------------------------------------------------+---------------------------------------------------------------------------------+ +| setEnablePartitionDenylist(boolean enabled) | Enables or disables the partition denylisting functionality | ++----------------------------------------------------------------------------+---------------------------------------------------------------------------------+ +| setEnableDenylistWrites(boolean enabled) | Enables or disables write denylisting functionality | ++----------------------------------------------------------------------------+---------------------------------------------------------------------------------+ +| setEnableDenylistReads(boolean enabled) | Enables or disables read denylisting functionality | ++----------------------------------------------------------------------------+---------------------------------------------------------------------------------+ +| setEnableDenylistRangeReads(boolean enabled) | Enables or disables range read denylisting functionality | ++----------------------------------------------------------------------------+---------------------------------------------------------------------------------+ +| denylistKey(String keyspace, String table, String partitionKeyAsString) | Adds a specific keyspace, table, and partition key combo to the denylist | ++----------------------------------------------------------------------------+---------------------------------------------------------------------------------+ +| removeDenylistKey(String keyspace, String cf, String partitionKeyAsString) | Removes a specific keyspace, table, and partition key combo from the denylist | ++----------------------------------------------------------------------------+---------------------------------------------------------------------------------+ +| setDenylistMaxKeysPerTable(int value) | Limits count of allowed keys per table in the denylist | ++----------------------------------------------------------------------------+---------------------------------------------------------------------------------+ +| setDenylistMaxKeysTotal(int value) | Limits the total count of allowable denylisted keys in the system | ++----------------------------------------------------------------------------+---------------------------------------------------------------------------------+ +| isKeyDenylisted(String keyspace, String table, String partitionKeyAsString)| Indicates whether the keyspace.table has the input partition key denied | ++----------------------------------------------------------------------------+---------------------------------------------------------------------------------+ diff --git a/doc/source/operating/index.rst b/doc/source/operating/index.rst index 78c7eb6..ed2c71a 100644 --- a/doc/source/operating/index.rst +++ b/doc/source/operating/index.rst @@ -36,4 +36,5 @@ Operating Cassandra metrics security hardware + denylisting_partitions diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index b9b5975..0dc4180 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -532,6 +532,38 @@ public class Config public volatile String auth_read_consistency_level = "LOCAL_QUORUM"; public volatile String auth_write_consistency_level = "EACH_QUORUM"; + /** This feature allows denying access to operations on certain key partitions, intended for use by operators to + * provide another tool to manage cluster health vs application access. See CASSANDRA-12106 and CEP-13 for more details. + */ + public volatile boolean enable_partition_denylist = false; + + public volatile boolean enable_denylist_writes = true; + + public volatile boolean enable_denylist_reads = true; + + public volatile boolean enable_denylist_range_reads = true; + + public int denylist_refresh_seconds = 600; + + public int denylist_initial_load_retry_seconds = 5; + + /** We cap the number of denylisted keys allowed per table to keep things from growing unbounded. Operators will + * receive warnings and only denylist_max_keys_per_table in natural query ordering will be processed on overflow. + */ + public volatile int denylist_max_keys_per_table = 1000; + + /** We cap the total number of denylisted keys allowed in the cluster to keep things from growing unbounded. + * Operators will receive warnings on initial cache load that there are too many keys and be directed to trim + * down the entries to within the configured limits. + */ + public volatile int denylist_max_keys_total = 10000; + + /** Since the denylist in many ways serves to protect the health of the cluster from partitions operators have identified + * as being in a bad state, we usually want more robustness than just CL.ONE on operations to/from these tables to + * ensure that these safeguards are in place. That said, we allow users to configure this if they're so inclined. + */ + public ConsistencyLevel denylist_consistency_level = ConsistencyLevel.QUORUM; + /** * The intial capacity for creating RangeTombstoneList. */ diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 7a3164a..d246fc7 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -3471,6 +3471,104 @@ public class DatabaseDescriptor conf.consecutive_message_errors_threshold = value; } + public static boolean getEnablePartitionDenylist() + { + return conf.enable_partition_denylist; + } + + public static void setEnablePartitionDenylist(boolean enabled) + { + conf.enable_partition_denylist = enabled; + } + + public static boolean getEnableDenylistWrites() + { + return conf.enable_denylist_writes; + } + + public static void setEnableDenylistWrites(boolean enabled) + { + conf.enable_denylist_writes = enabled; + } + + public static boolean getEnableDenylistReads() + { + return conf.enable_denylist_reads; + } + + public static void setEnableDenylistReads(boolean enabled) + { + conf.enable_denylist_reads = enabled; + } + + public static boolean getEnableDenylistRangeReads() + { + return conf.enable_denylist_range_reads; + } + + public static void setEnableDenylistRangeReads(boolean enabled) + { + conf.enable_denylist_range_reads = enabled; + } + + public static int getDenylistRefreshSeconds() + { + return conf.denylist_refresh_seconds; + } + + public static void setDenylistRefreshSeconds(int seconds) + { + if (seconds <= 0) + throw new IllegalArgumentException("denylist_refresh_seconds must be a positive integer."); + conf.denylist_refresh_seconds = seconds; + } + + public static int getDenylistInitialLoadRetrySeconds() + { + return conf.denylist_initial_load_retry_seconds; + } + + public static void setDenylistInitialLoadRetrySeconds(int seconds) + { + if (seconds <= 0) + throw new IllegalArgumentException("denylist_initial_load_retry_seconds must be a positive integer."); + conf.denylist_initial_load_retry_seconds = seconds; + } + + public static ConsistencyLevel getDenylistConsistencyLevel() + { + return conf.denylist_consistency_level; + } + + public static void setDenylistConsistencyLevel(ConsistencyLevel cl) + { + conf.denylist_consistency_level = cl; + } + + public static int getDenylistMaxKeysPerTable() + { + return conf.denylist_max_keys_per_table; + } + + public static void setDenylistMaxKeysPerTable(int value) + { + if (value <= 0) + throw new IllegalArgumentException("denylist_max_keys_per_table must be a positive integer."); + conf.denylist_max_keys_per_table = value; + } + + public static int getDenylistMaxKeysTotal() + { + return conf.denylist_max_keys_total; + } + + public static void setDenylistMaxKeysTotal(int value) + { + if (value <= 0) + throw new IllegalArgumentException("denylist_max_keys_total must be a positive integer."); + conf.denylist_max_keys_total = value; + } + public static SubnetGroups getClientErrorReportingExclusions() { return conf.client_error_reporting_exclusions; diff --git a/src/java/org/apache/cassandra/db/view/ViewBuilder.java b/src/java/org/apache/cassandra/db/view/ViewBuilder.java index d087224..8c840e9 100644 --- a/src/java/org/apache/cassandra/db/view/ViewBuilder.java +++ b/src/java/org/apache/cassandra/db/view/ViewBuilder.java @@ -41,7 +41,7 @@ import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.locator.RangesAtEndpoint; import org.apache.cassandra.locator.Replicas; -import org.apache.cassandra.repair.SystemDistributedKeyspace; +import org.apache.cassandra.schema.SystemDistributedKeyspace; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; diff --git a/src/java/org/apache/cassandra/db/view/ViewManager.java b/src/java/org/apache/cassandra/db/view/ViewManager.java index 7e3ea1b..111f96a 100644 --- a/src/java/org/apache/cassandra/db/view/ViewManager.java +++ b/src/java/org/apache/cassandra/db/view/ViewManager.java @@ -32,7 +32,7 @@ import org.apache.cassandra.schema.TableId; import org.apache.cassandra.schema.ViewMetadata; import org.apache.cassandra.db.*; import org.apache.cassandra.db.partitions.*; -import org.apache.cassandra.repair.SystemDistributedKeyspace; +import org.apache.cassandra.schema.SystemDistributedKeyspace; import org.apache.cassandra.schema.Views; import org.apache.cassandra.service.StorageService; diff --git a/src/java/org/apache/cassandra/metrics/DenylistMetrics.java b/src/java/org/apache/cassandra/metrics/DenylistMetrics.java new file mode 100644 index 0000000..0372787 --- /dev/null +++ b/src/java/org/apache/cassandra/metrics/DenylistMetrics.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.metrics; + +import com.codahale.metrics.Meter; + +import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; + +public class DenylistMetrics +{ + private final Meter writesRejected; + private final Meter readsRejected; + private final Meter rangeReadsRejected; + private final Meter totalRequestsRejected; + + public DenylistMetrics() + { + final MetricNameFactory factory = new DefaultNameFactory("StorageProxy", "PartitionDenylist"); + writesRejected = Metrics.meter(factory.createMetricName("WriteRejected")); + readsRejected = Metrics.meter(factory.createMetricName("ReadRejected")); + rangeReadsRejected = Metrics.meter(factory.createMetricName("RangeReadRejected")); + totalRequestsRejected = Metrics.meter(factory.createMetricName("TotalRejected")); + } + + public void incrementWritesRejected() + { + writesRejected.mark(); + totalRequestsRejected.mark(); + } + + public void incrementReadsRejected() + { + readsRejected.mark(); + totalRequestsRejected.mark(); + } + + public void incrementRangeReadsRejected() + { + rangeReadsRejected.mark(); + totalRequestsRejected.mark(); + } +} \ No newline at end of file diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java index 1e203c4..a57ca84 100644 --- a/src/java/org/apache/cassandra/repair/RepairJob.java +++ b/src/java/org/apache/cassandra/repair/RepairJob.java @@ -39,6 +39,7 @@ import org.apache.cassandra.repair.asymmetric.HostDifferences; import org.apache.cassandra.repair.asymmetric.PreferedNodeFilter; import org.apache.cassandra.repair.asymmetric.ReduceHelper; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.schema.SystemDistributedKeyspace; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.dht.Range; diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java index faa0a74..30244a7 100644 --- a/src/java/org/apache/cassandra/repair/RepairRunnable.java +++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java @@ -36,11 +36,6 @@ import org.slf4j.LoggerFactory; import com.codahale.metrics.Timer; import org.apache.cassandra.concurrent.ExecutorPlus; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.exceptions.RepairException; -import org.apache.cassandra.metrics.RepairMetrics; -import org.apache.cassandra.gms.FailureDetector; -import org.apache.cassandra.repair.consistent.SyncStatSummary; -import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.cql3.QueryOptions; import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.cql3.UntypedResultSet; @@ -50,6 +45,7 @@ import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.exceptions.RepairException; import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.locator.EndpointsForRange; import org.apache.cassandra.locator.InetAddressAndPort; @@ -60,8 +56,11 @@ import org.apache.cassandra.repair.consistent.CoordinatorSession; import org.apache.cassandra.repair.consistent.SyncStatSummary; import org.apache.cassandra.repair.messages.RepairOption; import org.apache.cassandra.schema.SchemaConstants; -import org.apache.cassandra.service.*; +import org.apache.cassandra.schema.SystemDistributedKeyspace; +import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.service.ActiveRepairService.ParentRepairStatus; +import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.service.StorageService; import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.tracing.TraceKeyspace; import org.apache.cassandra.tracing.TraceState; @@ -73,19 +72,18 @@ import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Throwables; import org.apache.cassandra.utils.UUIDGen; import org.apache.cassandra.utils.WrappedRunnable; +import org.apache.cassandra.utils.concurrent.Future; +import org.apache.cassandra.utils.concurrent.FutureCombiner; +import org.apache.cassandra.utils.concurrent.ImmediateFuture; import org.apache.cassandra.utils.progress.ProgressEvent; import org.apache.cassandra.utils.progress.ProgressEventNotifier; import org.apache.cassandra.utils.progress.ProgressEventType; import org.apache.cassandra.utils.progress.ProgressListener; +import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory; import static org.apache.cassandra.service.QueryState.forInternalCalls; import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis; import static org.apache.cassandra.utils.Clock.Global.nanoTime; -import org.apache.cassandra.utils.concurrent.Future; -import org.apache.cassandra.utils.concurrent.FutureCombiner; -import org.apache.cassandra.utils.concurrent.ImmediateFuture; - -import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory; public class RepairRunnable implements Runnable, ProgressEventNotifier { diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java index 1c7d6c9..2e2b36b 100644 --- a/src/java/org/apache/cassandra/repair/RepairSession.java +++ b/src/java/org/apache/cassandra/repair/RepairSession.java @@ -50,6 +50,7 @@ import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.repair.consistent.ConsistentSession; import org.apache.cassandra.repair.consistent.LocalSession; import org.apache.cassandra.repair.consistent.LocalSessions; +import org.apache.cassandra.schema.SystemDistributedKeyspace; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.streaming.SessionSummary; diff --git a/src/java/org/apache/cassandra/schema/PartitionDenylist.java b/src/java/org/apache/cassandra/schema/PartitionDenylist.java new file mode 100644 index 0000000..c9aee97 --- /dev/null +++ b/src/java/org/apache/cassandra/schema/PartitionDenylist.java @@ -0,0 +1,535 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.schema; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.ImmutableSortedSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.github.benmanes.caffeine.cache.CacheLoader; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; +import org.apache.cassandra.concurrent.ScheduledExecutors; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.UntypedResultSet; +import org.apache.cassandra.db.PartitionPosition; +import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.exceptions.RequestExecutionException; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.service.reads.range.RangeCommands; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.NoSpamLogger; + +import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory; +import static org.apache.cassandra.cql3.QueryProcessor.process; +import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis; + +/** + * PartitionDenylist uses the system_distributed.partition_denylist table to maintain a list of denylisted partition keys + * for each keyspace/table. + * + * Keys can be entered manually into the partition_denylist table or via the JMX operation StorageProxyMBean.denylistKey + * + * The denylist is stored as one CQL partition per table, and the denylisted keys are column names in that partition. The denylisted + * keys for each table are cached in memory, and reloaded from the partition_denylist table every 10 minutes (default) or when the + * StorageProxyMBean.loadPartitionDenylist is called via JMX. + * + * Concurrency of the cache is provided by the concurrency semantics of the guava LoadingCache. All values (DenylistEntry) are + * immutable collections of keys/tokens which are replaced in whole when the cache refreshes from disk. + * + * The CL for the denylist is used on initial node load as well as on timer instigated cache refreshes. A JMX call by the + * operator to load the denylist cache will warn on CL unavailability but go through with the denylist load. This is to + * allow operators flexibility in the face of degraded cluster state and still grant them the ability to mutate the denylist + * cache and bring it up if there are things they need to block on startup. + * + * Notably, in the current design it's possible for a table *cache expiration instigated* reload to end up violating the + * contract on total denylisted keys allowed in the case where it initially loads with a value less than the DBD + * allowable max per table limit due to global constraint enforcement on initial load. Our load and reload function + * simply enforce the *per table* limit without consideration to what that entails at the global key level. While we + * could track the constrained state and count in DenylistEntry, for now the complexity doesn't seem to justify the + * protection against that edge case. The enforcement should take place on a user-instigated full reload as well as + * error messaging about count violations, so this only applies to situations in which someone adds a key and doesn't + * actively tell the cache to fully reload to take that key into consideration, which one could reasonably expect to be + * an antipattern. + */ +public class PartitionDenylist +{ + private static final Logger logger = LoggerFactory.getLogger(PartitionDenylist.class); + private static final NoSpamLogger AVAILABILITY_LOGGER = NoSpamLogger.getLogger(logger, 1, TimeUnit.MINUTES); + + private final ExecutorService executor = executorFactory().pooled("DenylistCache", 2); + + /** We effectively don't use our initial empty cache to denylist until the {@link #load()} call which will replace it */ + private volatile LoadingCache<TableId, DenylistEntry> denylist = buildEmptyCache(); + + /** Denylist entry is never mutated once constructed, only replaced with a new entry when the cache is refreshed */ + private static class DenylistEntry + { + public final ImmutableSet<ByteBuffer> keys; + public final ImmutableSortedSet<Token> tokens; + + public DenylistEntry() + { + keys = ImmutableSet.of(); + tokens = ImmutableSortedSet.of(); + } + + public DenylistEntry(final ImmutableSet<ByteBuffer> keys, final ImmutableSortedSet<Token> tokens) + { + this.keys = keys; + this.tokens = tokens; + } + } + + /** synchronized on this */ + private int loadAttempts = 0; + + /** synchronized on this */ + private int loadSuccesses = 0; + + public synchronized int getLoadAttempts() + { + return loadAttempts; + } + public synchronized int getLoadSuccesses() + { + return loadSuccesses; + } + + /** + * Performs initial load of the partition denylist. Should be called at startup and only loads if the operation + * is expected to succeed. If it is not possible to load at call time, a timer is set to retry. + */ + public void initialLoad() + { + if (!DatabaseDescriptor.getEnablePartitionDenylist()) + return; + + synchronized (this) + { + loadAttempts++; + } + + // Check if there are sufficient nodes to attempt reading all the denylist partitions before issuing the query. + // The pre-check prevents definite range-slice unavailables being marked and triggering an alert. Nodes may still change + // state between the check and the query, but it should significantly reduce the alert volume. + String retryReason = "Insufficient nodes"; + try + { + if (checkDenylistNodeAvailability()) + { + load(); + return; + } + } + catch (Throwable tr) + { + logger.error("Failed to load partition denylist", tr); + retryReason = "Exception"; + } + + // This path will also be taken on other failures other than UnavailableException, + // but seems like a good idea to retry anyway. + int retryInSeconds = DatabaseDescriptor.getDenylistInitialLoadRetrySeconds(); + logger.info("{} while loading partition denylist cache. Scheduled retry in {} seconds.", retryReason, retryInSeconds); + ScheduledExecutors.optionalTasks.schedule(this::initialLoad, retryInSeconds, TimeUnit.SECONDS); + } + + private boolean checkDenylistNodeAvailability() + { + boolean sufficientNodes = RangeCommands.sufficientLiveNodesForSelectStar(SystemDistributedKeyspace.PartitionDenylistTable, + DatabaseDescriptor.getDenylistConsistencyLevel()); + if (!sufficientNodes) + { + AVAILABILITY_LOGGER.warn("Attempting to load denylist and not enough nodes are available for a {} refresh. Reload the denylist when unavailable nodes are recovered to ensure your denylist remains in sync.", + DatabaseDescriptor.getDenylistConsistencyLevel()); + } + return sufficientNodes; + } + + /** Helper method as we need to both build cache on initial init but also on reload of cache contents and params */ + private LoadingCache<TableId, DenylistEntry> buildEmptyCache() + { + // We rely on details of .refreshAfterWrite to reload this async in the background when it's hit: + // https://github.com/ben-manes/caffeine/wiki/Refresh + return Caffeine.newBuilder() + .refreshAfterWrite(DatabaseDescriptor.getDenylistRefreshSeconds(), TimeUnit.SECONDS) + .executor(executor) + .build(new CacheLoader<TableId, DenylistEntry>() + { + @Override + public DenylistEntry load(final TableId tid) + { + // We load whether or not the CL required count are available as the alternative is an + // empty denylist. This allows operators to intervene in the event they need to deny or + // undeny a specific partition key around a node recovery. + checkDenylistNodeAvailability(); + return getDenylistForTableFromCQL(tid); + } + + // The synchronous reload method defaults to being wrapped with a supplyAsync in CacheLoader.asyncReload + @Override + public DenylistEntry reload(final TableId tid, final DenylistEntry oldValue) + { + // Only process when we can hit the user specified CL for the denylist consistency on a timer prompted reload + if (checkDenylistNodeAvailability()) + { + final DenylistEntry newEntry = getDenylistForTableFromCQL(tid); + if (newEntry != null) + return newEntry; + } + if (oldValue != null) + return oldValue; + return new DenylistEntry(); + } + }); + } + + /** + * We need to fully rebuild a new cache to accommodate deleting items from the denylist and potentially shrinking + * the max allowable size in the list. We do not serve queries out of this denylist until it is populated + * so as not to introduce a window of having a partially filled cache allow denylisted entries. + */ + public void load() + { + final long start = currentTimeMillis(); + + final Map<TableId, DenylistEntry> allDenylists = getDenylistForAllTablesFromCQL(); + + // On initial load we have the slight overhead of GC'ing our initial empty cache + LoadingCache<TableId, DenylistEntry> newDenylist = buildEmptyCache(); + newDenylist.putAll(allDenylists); + + synchronized (this) + { + loadSuccesses++; + } + denylist = newDenylist; + logger.info("Loaded partition denylist cache in {}ms", currentTimeMillis() - start); + } + + /** + * We expect the caller to confirm that we are working with a valid keyspace and table. Further, we expect the usage + * pattern of this to be one-off key by key, not in a bulk process, so we reload the entire table's deny list entry + * on an addition or removal. + */ + public boolean addKeyToDenylist(final String keyspace, final String table, final ByteBuffer key) + { + if (!canDenylistKeyspace(keyspace)) + return false; + + final String insert = String.format("INSERT INTO system_distributed.partition_denylist (ks_name, table_name, key) VALUES ('%s', '%s', 0x%s)", + keyspace, table, ByteBufferUtil.bytesToHex(key)); + + try + { + process(insert, DatabaseDescriptor.getDenylistConsistencyLevel()); + return refreshTableDenylist(keyspace, table); + } + catch (final RequestExecutionException e) + { + logger.error("Failed to denylist key [{}] in {}/{}", ByteBufferUtil.bytesToHex(key), keyspace, table, e); + } + return false; + } + + /** + * We expect the caller to confirm that we are working with a valid keyspace and table. + */ + public boolean removeKeyFromDenylist(final String keyspace, final String table, final ByteBuffer key) + { + final String delete = String.format("DELETE FROM system_distributed.partition_denylist " + + "WHERE ks_name = '%s' " + + "AND table_name = '%s' " + + "AND key = 0x%s", + keyspace, table, ByteBufferUtil.bytesToHex(key)); + + try + { + process(delete, DatabaseDescriptor.getDenylistConsistencyLevel()); + return refreshTableDenylist(keyspace, table); + } + catch (final RequestExecutionException e) + { + logger.error("Failed to remove key from denylist: [{}] in {}/{}", ByteBufferUtil.bytesToHex(key), keyspace, table, e); + } + return false; + } + + /** + * We disallow denylisting partitions in certain critical keyspaces to prevent users from making their clusters + * inoperable. + */ + private boolean canDenylistKeyspace(final String keyspace) + { + return !SchemaConstants.DISTRIBUTED_KEYSPACE_NAME.equals(keyspace) && + !SchemaConstants.SYSTEM_KEYSPACE_NAME.equals(keyspace) && + !SchemaConstants.TRACE_KEYSPACE_NAME.equals(keyspace) && + !SchemaConstants.VIRTUAL_SCHEMA.equals(keyspace) && + !SchemaConstants.VIRTUAL_VIEWS.equals(keyspace) && + !SchemaConstants.AUTH_KEYSPACE_NAME.equals(keyspace); + } + + public boolean isKeyPermitted(final String keyspace, final String table, final ByteBuffer key) + { + return isKeyPermitted(getTableId(keyspace, table), key); + } + + public boolean isKeyPermitted(final TableId tid, final ByteBuffer key) + { + final TableMetadata tmd = Schema.instance.getTableMetadata(tid); + + // We have a few quick state checks to get out of the way first; this is hot path so we want to do these first if possible. + if (!DatabaseDescriptor.getEnablePartitionDenylist() || tid == null || tmd == null || !canDenylistKeyspace(tmd.keyspace)) + return true; + + try + { + // If we don't have an entry for this table id, nothing in it is denylisted. + DenylistEntry entry = denylist.get(tid); + if (entry == null) + return true; + return !entry.keys.contains(key); + } + catch (final Exception e) + { + // In the event of an error accessing or populating the cache, assume it's not denylisted + logAccessFailure(tid, e); + return true; + } + } + + private void logAccessFailure(final TableId tid, Throwable e) + { + final TableMetadata tmd = Schema.instance.getTableMetadata(tid); + if (tmd == null) + logger.debug("Failed to access partition denylist cache for unknown table id {}", tid.toString(), e); + else + logger.debug("Failed to access partition denylist cache for {}/{}", tmd.keyspace, tmd.name, e); + } + + /** + * @return number of denylisted keys in range + */ + public int getDeniedKeysInRangeCount(final String keyspace, final String table, final AbstractBounds<PartitionPosition> range) + { + return getDeniedKeysInRangeCount(getTableId(keyspace, table), range); + } + + /** + * @return number of denylisted keys in range + */ + public int getDeniedKeysInRangeCount(final TableId tid, final AbstractBounds<PartitionPosition> range) + { + final TableMetadata tmd = Schema.instance.getTableMetadata(tid); + if (!DatabaseDescriptor.getEnablePartitionDenylist() || tid == null || tmd == null || !canDenylistKeyspace(tmd.keyspace)) + return 0; + + try + { + final DenylistEntry denylistEntry = denylist.get(tid); + if (denylistEntry == null || denylistEntry.tokens.size() == 0) + return 0; + final Token startToken = range.left.getToken(); + final Token endToken = range.right.getToken(); + + // Normal case + if (startToken.compareTo(endToken) <= 0 || endToken.isMinimum()) + { + NavigableSet<Token> subSet = denylistEntry.tokens.tailSet(startToken, PartitionPosition.Kind.MIN_BOUND == range.left.kind()); + if (!endToken.isMinimum()) + subSet = subSet.headSet(endToken, PartitionPosition.Kind.MAX_BOUND == range.right.kind()); + return subSet.size(); + } + + // Wrap around case + return denylistEntry.tokens.tailSet(startToken, PartitionPosition.Kind.MIN_BOUND == range.left.kind()).size() + + denylistEntry.tokens.headSet(endToken, PartitionPosition.Kind.MAX_BOUND == range.right.kind()).size(); + } + catch (final Exception e) + { + logAccessFailure(tid, e); + return 0; + } + } + + /** + * Get up to the configured allowable limit per table of denylisted keys + */ + private DenylistEntry getDenylistForTableFromCQL(final TableId tid) + { + return getDenylistForTableFromCQL(tid, DatabaseDescriptor.getDenylistMaxKeysPerTable()); + } + + /** + * Attempts to reload the DenylistEntry data from CQL for the given TableId and key count. + * @return null if we do not find the data; allows cache reloader to preserve old value + */ + private DenylistEntry getDenylistForTableFromCQL(final TableId tid, int limit) + { + final TableMetadata tmd = Schema.instance.getTableMetadata(tid); + if (tmd == null) + return null; + + // We attempt to query just over our allowable max keys in order to check whether we have configured data beyond that limit and alert the user if so + final String readDenylist = String.format("SELECT * FROM %s.%s WHERE ks_name='%s' AND table_name='%s' LIMIT %d", + SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, + SystemDistributedKeyspace.PARTITION_DENYLIST_TABLE, + tmd.keyspace, + tmd.name, + limit + 1); + + try + { + final UntypedResultSet results = process(readDenylist, DatabaseDescriptor.getDenylistConsistencyLevel()); + + // If there's no data in CQL we want to return an empty DenylistEntry so we don't continue using the old value in the cache + if (results == null || results.isEmpty()) + return new DenylistEntry(); + + if (results.size() > limit) + { + // If our limit is < the standard per table we know we're at a global violation because we've constrained that request limit already. + boolean globalLimit = limit != DatabaseDescriptor.getDenylistMaxKeysPerTable(); + String violationType = globalLimit ? "global" : "per-table"; + int errorLimit = globalLimit ? DatabaseDescriptor.getDenylistMaxKeysTotal() : limit; + logger.error("Partition denylist for {}/{} has exceeded the {} allowance of ({}). Remaining keys were ignored; " + + "please reduce the total number of keys denied or increase the denylist_max_keys_per_table param in " + + "cassandra.yaml to avoid inconsistency in denied partitions across nodes.", + tmd.keyspace, + tmd.name, + violationType, + errorLimit); + } + + final Set<ByteBuffer> keys = new HashSet<>(); + final NavigableSet<Token> tokens = new TreeSet<>(); + + int processed = 0; + for (final UntypedResultSet.Row row : results) + { + final ByteBuffer key = row.getBlob("key"); + keys.add(key); + tokens.add(StorageService.instance.getTokenMetadata().partitioner.getToken(key)); + + processed++; + if (processed >= limit) + break; + } + return new DenylistEntry(ImmutableSet.copyOf(keys), ImmutableSortedSet.copyOf(tokens)); + } + catch (final RequestExecutionException e) + { + logger.error("Error reading partition_denylist table for {}/{}. Returning empty list.", tmd.keyspace, tmd.name, e); + return null; + } + } + + /** + * This method relies on {@link #getDenylistForTableFromCQL(TableId, int)} to pull a limited amount of keys + * on a per-table basis from CQL to load into the cache. We need to navigate both respecting the max cache size limit + * as well as respecting the per-table limit. + * @return non-null mapping of TableId to DenylistEntry + */ + private Map<TableId, DenylistEntry> getDenylistForAllTablesFromCQL() + { + // While we warn the user in this case, we continue with the reload anyway. + checkDenylistNodeAvailability(); + + final String allDeniedTables = String.format("SELECT DISTINCT ks_name, table_name FROM %s.%s", + SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, + SystemDistributedKeyspace.PARTITION_DENYLIST_TABLE); + try + { + final UntypedResultSet deniedTableResults = process(allDeniedTables, DatabaseDescriptor.getDenylistConsistencyLevel()); + if (deniedTableResults == null || deniedTableResults.isEmpty()) + return Collections.emptyMap(); + + int totalProcessed = 0 ; + final Map<TableId, DenylistEntry> results = new HashMap<>(); + for (final UntypedResultSet.Row row : deniedTableResults) + { + final String ks = row.getString("ks_name"); + final String table = row.getString("table_name"); + final TableId tid = getTableId(ks, table); + if (DatabaseDescriptor.getDenylistMaxKeysTotal() - totalProcessed <= 0) + { + logger.error("Hit limit on allowable denylisted keys in total. Processed {} total entries. Not adding all entries to denylist for {}/{}." + + " Remove denylist entries in system_distributed.{} or increase your denylist_max_keys_total param in cassandra.yaml.", + totalProcessed, + ks, + table, + SystemDistributedKeyspace.PARTITION_DENYLIST_TABLE); + results.put(tid, new DenylistEntry()); + } + else + { + // Determine whether we can get up to table max or we need a subset at edge condition of max overflow. + int allowedTableRecords = Math.min(DatabaseDescriptor.getDenylistMaxKeysPerTable(), DatabaseDescriptor.getDenylistMaxKeysTotal() - totalProcessed); + DenylistEntry tableDenylist = getDenylistForTableFromCQL(tid, allowedTableRecords); + if (tableDenylist != null) + totalProcessed += tableDenylist.keys.size(); + results.put(tid, tableDenylist); + } + } + return results; + } + catch (final RequestExecutionException e) + { + logger.error("Error reading full partition denylist from " + + SchemaConstants.DISTRIBUTED_KEYSPACE_NAME + "." + SystemDistributedKeyspace.PARTITION_DENYLIST_TABLE + + ". Partition Denylisting will be compromised. Exception: " + e); + return Collections.emptyMap(); + } + } + + private boolean refreshTableDenylist(String keyspace, String table) + { + checkDenylistNodeAvailability(); + final TableId tid = getTableId(keyspace, table); + if (tid == null) + { + logger.warn("Got denylist mutation for unknown ks/cf: {}/{}. Skipping refresh.", keyspace, table); + return false; + } + + DenylistEntry newEntry = getDenylistForTableFromCQL(tid); + denylist.put(tid, newEntry); + return true; + } + + private TableId getTableId(final String keyspace, final String table) + { + TableMetadata tmd = Schema.instance.getTableMetadata(keyspace, table); + return tmd == null ? null : tmd.id; + } +} diff --git a/src/java/org/apache/cassandra/schema/SchemaConstants.java b/src/java/org/apache/cassandra/schema/SchemaConstants.java index 7b6b7de..870fa99 100644 --- a/src/java/org/apache/cassandra/schema/SchemaConstants.java +++ b/src/java/org/apache/cassandra/schema/SchemaConstants.java @@ -28,6 +28,9 @@ import com.google.common.collect.ImmutableSet; import org.apache.cassandra.db.Digest; +/** + * When adding new String keyspace names here, double check if it needs to be added to PartitionDenylist.canDenylistKeyspace + */ public final class SchemaConstants { public static final Pattern PATTERN_WORD_CHARS = Pattern.compile("\\w+"); diff --git a/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java b/src/java/org/apache/cassandra/schema/SystemDistributedKeyspace.java similarity index 95% rename from src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java rename to src/java/org/apache/cassandra/schema/SystemDistributedKeyspace.java index 5df38a7..a539686 100644 --- a/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java +++ b/src/java/org/apache/cassandra/schema/SystemDistributedKeyspace.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.cassandra.repair; +package org.apache.cassandra.schema; import java.io.PrintWriter; import java.io.StringWriter; @@ -49,14 +49,8 @@ import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.repair.CommonRange; import org.apache.cassandra.repair.messages.RepairOption; -import org.apache.cassandra.schema.CompactionParams; -import org.apache.cassandra.schema.KeyspaceMetadata; -import org.apache.cassandra.schema.KeyspaceParams; -import org.apache.cassandra.schema.SchemaConstants; -import org.apache.cassandra.schema.TableId; -import org.apache.cassandra.schema.TableMetadata; -import org.apache.cassandra.schema.Tables; import org.apache.cassandra.utils.FBUtilities; import static java.lang.String.format; @@ -69,6 +63,8 @@ public final class SystemDistributedKeyspace { } + public static final String NAME = "system_distributed"; + private static final int DEFAULT_RF = CassandraRelevantProperties.SYSTEM_DISTRIBUTED_DEFAULT_RF.getInt(); private static final Logger logger = LoggerFactory.getLogger(SystemDistributedKeyspace.class); @@ -83,8 +79,9 @@ public final class SystemDistributedKeyspace * gen 3: gc_grace_seconds raised from 0 to 10 days in CASSANDRA-12954 in 3.11.0 * gen 4: compression chunk length reduced to 16KiB, memtable_flush_period_in_ms now unset on all tables in 4.0 * gen 5: add ttl and TWCS to repair_history tables + * gen 6: add denylist table */ - public static final long GENERATION = 5; + public static final long GENERATION = 6; public static final String REPAIR_HISTORY = "repair_history"; @@ -92,6 +89,8 @@ public final class SystemDistributedKeyspace public static final String VIEW_BUILD_STATUS = "view_build_status"; + public static final String PARTITION_DENYLIST_TABLE = "partition_denylist"; + private static final TableMetadata RepairHistory = parse(REPAIR_HISTORY, "Repair history", @@ -147,6 +146,16 @@ public final class SystemDistributedKeyspace + "status text," + "PRIMARY KEY ((keyspace_name, view_name), host_id))").build(); + public static final TableMetadata PartitionDenylistTable = + parse(PARTITION_DENYLIST_TABLE, + "Partition keys which have been denied access", + "CREATE TABLE %s (" + + "ks_name text," + + "table_name text," + + "key blob," + + "PRIMARY KEY ((ks_name, table_name), key))") + .build(); + private static TableMetadata.Builder parse(String table, String description, String cql) { return CreateTableStatement.parse(format(cql, table), SchemaConstants.DISTRIBUTED_KEYSPACE_NAME) @@ -156,7 +165,7 @@ public final class SystemDistributedKeyspace public static KeyspaceMetadata metadata() { - return KeyspaceMetadata.create(SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, KeyspaceParams.simple(Math.max(DEFAULT_RF, DatabaseDescriptor.getDefaultKeyspaceRF())), Tables.of(RepairHistory, ParentRepairHistory, ViewBuildStatus)); + return KeyspaceMetadata.create(SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, KeyspaceParams.simple(Math.max(DEFAULT_RF, DatabaseDescriptor.getDefaultKeyspaceRF())), Tables.of(RepairHistory, ParentRepairHistory, ViewBuildStatus, PartitionDenylistTable)); } public static void startParentRepair(UUID parent_id, String keyspaceName, String[] cfnames, RepairOption options) @@ -214,8 +223,8 @@ public final class SystemDistributedKeyspace public static void startRepairs(UUID id, UUID parent_id, String keyspaceName, String[] cfnames, CommonRange commonRange) { - //Don't record repair history if an upgrade is in progress as version 3 nodes generates errors - //due to schema differences + // Don't record repair history if an upgrade is in progress as version 3 nodes generates errors + // due to schema differences boolean includeNewColumns = !Gossiper.instance.hasMajorVersion3Nodes(); InetAddressAndPort coordinator = FBUtilities.getBroadcastAddressAndPort(); diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index c8495dc..fd20230 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -44,10 +44,6 @@ import com.google.common.cache.CacheLoader; import com.google.common.collect.Iterables; import com.google.common.primitives.Ints; import com.google.common.util.concurrent.Uninterruptibles; - -import org.apache.cassandra.utils.Clock; -import org.apache.cassandra.utils.concurrent.CountDownLatch; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,7 +51,7 @@ import org.apache.cassandra.batchlog.Batch; import org.apache.cassandra.batchlog.BatchlogManager; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.RejectException; +import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.CounterMutation; import org.apache.cassandra.db.DecoratedKey; @@ -67,6 +63,7 @@ import org.apache.cassandra.db.PartitionRangeReadCommand; import org.apache.cassandra.db.ReadCommand; import org.apache.cassandra.db.ReadExecutionController; import org.apache.cassandra.db.ReadResponse; +import org.apache.cassandra.db.RejectException; import org.apache.cassandra.db.SinglePartitionReadCommand; import org.apache.cassandra.db.TruncateRequest; import org.apache.cassandra.db.WriteType; @@ -79,12 +76,12 @@ import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; import org.apache.cassandra.db.rows.RowIterator; import org.apache.cassandra.db.view.ViewUtils; import org.apache.cassandra.dht.Token; -import org.apache.cassandra.exceptions.ReadAbortException; import org.apache.cassandra.exceptions.CasWriteTimeoutException; import org.apache.cassandra.exceptions.CasWriteUnknownResultException; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.exceptions.IsBootstrappingException; import org.apache.cassandra.exceptions.OverloadedException; +import org.apache.cassandra.exceptions.ReadAbortException; import org.apache.cassandra.exceptions.ReadFailureException; import org.apache.cassandra.exceptions.ReadTimeoutException; import org.apache.cassandra.exceptions.RequestFailureException; @@ -109,6 +106,7 @@ import org.apache.cassandra.metrics.CASClientRequestMetrics; import org.apache.cassandra.metrics.CASClientWriteRequestMetrics; import org.apache.cassandra.metrics.ClientRequestMetrics; import org.apache.cassandra.metrics.ClientWriteRequestMetrics; +import org.apache.cassandra.metrics.DenylistMetrics; import org.apache.cassandra.metrics.ReadRepairMetrics; import org.apache.cassandra.metrics.StorageMetrics; import org.apache.cassandra.metrics.ViewWriteMetrics; @@ -118,8 +116,10 @@ import org.apache.cassandra.net.MessageFlag; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.net.RequestCallback; import org.apache.cassandra.net.Verb; +import org.apache.cassandra.schema.PartitionDenylist; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.SchemaConstants; +import org.apache.cassandra.schema.TableId; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.service.paxos.Commit; import org.apache.cassandra.service.paxos.PaxosState; @@ -131,11 +131,13 @@ import org.apache.cassandra.service.reads.range.RangeCommands; import org.apache.cassandra.service.reads.repair.ReadRepair; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.triggers.TriggerExecutor; +import org.apache.cassandra.utils.Clock; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.MBeanWrapper; import org.apache.cassandra.utils.MonotonicClock; import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.UUIDGen; +import org.apache.cassandra.utils.concurrent.CountDownLatch; import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException; import static com.google.common.collect.Iterables.concat; @@ -180,11 +182,14 @@ public class StorageProxy implements StorageProxyMBean private static final ViewWriteMetrics viewWriteMetrics = new ViewWriteMetrics("ViewWrite"); private static final Map<ConsistencyLevel, ClientRequestMetrics> readMetricsMap = new EnumMap<>(ConsistencyLevel.class); private static final Map<ConsistencyLevel, ClientWriteRequestMetrics> writeMetricsMap = new EnumMap<>(ConsistencyLevel.class); + private static final DenylistMetrics denylistMetrics = new DenylistMetrics(); private static final String DISABLE_SERIAL_READ_LINEARIZABILITY_KEY = "cassandra.unsafe.disable-serial-reads-linearizability"; private static final boolean disableSerialReadLinearizability = Boolean.parseBoolean(System.getProperty(DISABLE_SERIAL_READ_LINEARIZABILITY_KEY, "false")); + private static final PartitionDenylist partitionDenylist = new PartitionDenylist(); + private StorageProxy() { } @@ -297,6 +302,13 @@ public class StorageProxy implements StorageProxyMBean { TableMetadata metadata = Schema.instance.validateTable(keyspaceName, cfName); + if (DatabaseDescriptor.getEnablePartitionDenylist() && DatabaseDescriptor.getEnableDenylistWrites() && !partitionDenylist.isKeyPermitted(keyspaceName, cfName, key.getKey())) + { + denylistMetrics.incrementWritesRejected(); + throw new InvalidRequestException(String.format("Unable to CAS write to denylisted partition [0x%s] in %s/%s", + key.toString(), keyspaceName, cfName)); + } + Supplier<Pair<PartitionUpdate, RowIterator>> updateProposer = () -> { // read the current values and check they validate the conditions @@ -1056,6 +1068,25 @@ public class StorageProxy implements StorageProxyMBean long queryStartNanoTime) throws WriteTimeoutException, WriteFailureException, UnavailableException, OverloadedException, InvalidRequestException { + if (DatabaseDescriptor.getEnablePartitionDenylist() && DatabaseDescriptor.getEnableDenylistWrites()) + { + for (final IMutation mutation : mutations) + { + for (final TableId tid : mutation.getTableIds()) + { + if (!partitionDenylist.isKeyPermitted(tid, mutation.key().getKey())) + { + denylistMetrics.incrementWritesRejected(); + // While Schema.instance.getTableMetadata() can return a null value, in this case the isKeyPermitted + // call above ensures that we cannot have a null associated tid at this point. + final TableMetadata tmd = Schema.instance.getTableMetadata(tid); + throw new InvalidRequestException(String.format("Unable to write to denylisted partition [0x%s] in %s/%s", + mutation.key().toString(), tmd.keyspace, tmd.name)); + } + } + } + } + Collection<Mutation> augmented = TriggerExecutor.instance.execute(mutations); boolean updatesView = Keyspace.open(mutations.iterator().next().getKeyspaceName()) @@ -1739,6 +1770,19 @@ public class StorageProxy implements StorageProxyMBean throw new IsBootstrappingException(); } + if (DatabaseDescriptor.getEnablePartitionDenylist() && DatabaseDescriptor.getEnableDenylistReads()) + { + for (SinglePartitionReadCommand command : group.queries) + { + if (!partitionDenylist.isKeyPermitted(command.metadata().id, command.partitionKey().getKey())) + { + denylistMetrics.incrementReadsRejected(); + throw new InvalidRequestException(String.format("Unable to read denylisted partition [0x%s] in %s/%s", + command.partitionKey().toString(), command.metadata().keyspace, command.metadata().name)); + } + } + } + return consistencyLevel.isSerialConsistency() ? readWithPaxos(group, consistencyLevel, state, queryStartNanoTime) : readRegular(group, consistencyLevel, queryStartNanoTime); @@ -2072,6 +2116,18 @@ public class StorageProxy implements StorageProxyMBean ConsistencyLevel consistencyLevel, long queryStartNanoTime) { + if (DatabaseDescriptor.getEnablePartitionDenylist() && DatabaseDescriptor.getEnableDenylistRangeReads()) + { + final int denylisted = partitionDenylist.getDeniedKeysInRangeCount(command.metadata().id, command.dataRange().keyRange()); + if (denylisted > 0) + { + denylistMetrics.incrementRangeReadsRejected(); + String tokens = command.loggableTokens(); + throw new InvalidRequestException(String.format("Attempted to read a range containing %d denylisted keys in %s/%s." + + " Range read: %s", denylisted, command.metadata().keyspace, command.metadata().name, + tokens)); + } + } return RangeCommands.partitions(command, consistencyLevel, queryStartNanoTime); } @@ -2756,4 +2812,121 @@ public class StorageProxy implements StorageProxyMBean { DatabaseDescriptor.setCheckForDuplicateRowsDuringCompaction(false); } + + public void initialLoadPartitionDenylist() + { + partitionDenylist.initialLoad(); + } + + @Override + public void loadPartitionDenylist() + { + partitionDenylist.load(); + } + + @Override + public int getPartitionDenylistLoadAttempts() + { + return partitionDenylist.getLoadAttempts(); + } + + @Override + public int getPartitionDenylistLoadSuccesses() + { + return partitionDenylist.getLoadSuccesses(); + } + + @Override + public void setEnablePartitionDenylist(boolean enabled) + { + DatabaseDescriptor.setEnablePartitionDenylist(enabled); + } + + @Override + public void setEnableDenylistWrites(boolean enabled) + { + DatabaseDescriptor.setEnableDenylistWrites(enabled); + } + + @Override + public void setEnableDenylistReads(boolean enabled) + { + DatabaseDescriptor.setEnableDenylistReads(enabled); + } + + @Override + public void setEnableDenylistRangeReads(boolean enabled) + { + DatabaseDescriptor.setEnableDenylistRangeReads(enabled); + } + + @Override + public void setDenylistMaxKeysPerTable(int value) + { + DatabaseDescriptor.setDenylistMaxKeysPerTable(value); + } + + @Override + public void setDenylistMaxKeysTotal(int value) + { + DatabaseDescriptor.setDenylistMaxKeysTotal(value); + } + + /** + * Actively denies read and write access to the provided Partition Key + * @param keyspace Name of keyspace containing the PK you wish to deny access to + * @param table Name of table containing the PK you wish to deny access to + * @param partitionKeyAsString String representation of the PK you want to deny access to + * @return true if successfully added, false if failure + */ + @Override + public boolean denylistKey(String keyspace, String table, String partitionKeyAsString) + { + if (!Schema.instance.getKeyspaces().contains(keyspace)) + return false; + + final ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(keyspace, table); + if (cfs == null) + return false; + + final ByteBuffer bytes = cfs.metadata.get().partitionKeyType.fromString(partitionKeyAsString); + return partitionDenylist.addKeyToDenylist(keyspace, table, bytes); + } + + /** + * Attempts to remove the provided pk from the ks + table deny list + * @param keyspace Keyspace containing the pk to remove the denylist entry for + * @param table Table containing the pk to remove denylist entry for + * @param partitionKeyAsString String representation of the PK you want to re-allow access to + * @return true if found and removed, false if not + */ + @Override + public boolean removeDenylistKey(String keyspace, String table, String partitionKeyAsString) + { + if (!Schema.instance.getKeyspaces().contains(keyspace)) + return false; + + final ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(keyspace, table); + if (cfs == null) + return false; + + final ByteBuffer bytes = cfs.metadata.get().partitionKeyType.fromString(partitionKeyAsString); + return partitionDenylist.removeKeyFromDenylist(keyspace, table, bytes); + } + + /** + * A simple check for operators to determine what the denylisted value for a pk is on a node + */ + public boolean isKeyDenylisted(String keyspace, String table, String partitionKeyAsString) + { + if (!Schema.instance.getKeyspaces().contains(keyspace)) + return false; + + final ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(keyspace, table); + if (cfs == null) + return false; + + final ByteBuffer bytes = cfs.metadata.get().partitionKeyType.fromString(partitionKeyAsString); + return !partitionDenylist.isKeyPermitted(keyspace, table, bytes); + } } diff --git a/src/java/org/apache/cassandra/service/StorageProxyMBean.java b/src/java/org/apache/cassandra/service/StorageProxyMBean.java index 56c27e9..a89b877 100644 --- a/src/java/org/apache/cassandra/service/StorageProxyMBean.java +++ b/src/java/org/apache/cassandra/service/StorageProxyMBean.java @@ -63,6 +63,20 @@ public interface StorageProxyMBean @Deprecated public int getOtcBacklogExpirationInterval(); + + public void loadPartitionDenylist(); + public int getPartitionDenylistLoadAttempts(); + public int getPartitionDenylistLoadSuccesses(); + public void setEnablePartitionDenylist(boolean enabled); + public void setEnableDenylistWrites(boolean enabled); + public void setEnableDenylistReads(boolean enabled); + public void setEnableDenylistRangeReads(boolean enabled); + public boolean denylistKey(String keyspace, String table, String partitionKeyAsString); + public boolean removeDenylistKey(String keyspace, String table, String partitionKeyAsString); + public void setDenylistMaxKeysPerTable(int value); + public void setDenylistMaxKeysTotal(int value); + public boolean isKeyDenylisted(String keyspace, String table, String partitionKeyAsString); + @Deprecated public void setOtcBacklogExpirationInterval(int intervalInMillis); diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 65a614a..5015f75 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -114,6 +114,7 @@ import org.apache.cassandra.schema.MigrationManager; import org.apache.cassandra.schema.ReplicationParams; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.SchemaConstants; +import org.apache.cassandra.schema.SystemDistributedKeyspace; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.schema.TableMetadataRef; import org.apache.cassandra.schema.ViewMetadata; @@ -1120,6 +1121,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE { logger.warn("Some data streaming failed. Use nodetool to check bootstrap state and resume. For more, see `nodetool help bootstrap`. {}", SystemKeyspace.getBootstrapState()); } + + StorageProxy.instance.initialLoadPartitionDenylist(); } else { diff --git a/src/java/org/apache/cassandra/service/reads/range/RangeCommands.java b/src/java/org/apache/cassandra/service/reads/range/RangeCommands.java index 3452a35..5b656d7 100644 --- a/src/java/org/apache/cassandra/service/reads/range/RangeCommands.java +++ b/src/java/org/apache/cassandra/service/reads/range/RangeCommands.java @@ -24,10 +24,14 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.DataRange; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.PartitionRangeReadCommand; import org.apache.cassandra.db.partitions.PartitionIterator; +import org.apache.cassandra.exceptions.UnavailableException; import org.apache.cassandra.index.Index; +import org.apache.cassandra.locator.ReplicaPlans; +import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.FBUtilities; import org.assertj.core.util.VisibleForTesting; @@ -114,4 +118,26 @@ public class RangeCommands return (maxExpectedResults / DatabaseDescriptor.getNumTokens()) / keyspace.getReplicationStrategy().getReplicationFactor().allReplicas; } + + /** + * Added specifically to check for sufficient nodes live to serve partition denylist queries + */ + public static boolean sufficientLiveNodesForSelectStar(TableMetadata metadata, ConsistencyLevel consistency) + { + try + { + Keyspace keyspace = Keyspace.open(metadata.keyspace); + ReplicaPlanIterator rangeIterator = new ReplicaPlanIterator(DataRange.allData(metadata.partitioner).keyRange(), + keyspace, consistency); + + // Called for the side effect of running assureSufficientLiveReplicasForRead. + // Deliberately called with an invalid vnode count in case it is used elsewhere in the future.. + rangeIterator.forEachRemaining(r -> ReplicaPlans.forRangeRead(keyspace, consistency, r.range(), -1)); + return true; + } + catch (UnavailableException e) + { + return false; + } + } } diff --git a/test/distributed/org/apache/cassandra/distributed/test/GossipSettlesTest.java b/test/distributed/org/apache/cassandra/distributed/test/GossipSettlesTest.java index 5b9629a..341d854 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/GossipSettlesTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/GossipSettlesTest.java @@ -33,7 +33,7 @@ import org.apache.cassandra.distributed.Cluster; import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.repair.SystemDistributedKeyspace; +import org.apache.cassandra.schema.SystemDistributedKeyspace; import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.service.StorageProxy; import org.apache.cassandra.service.StorageService; diff --git a/test/distributed/org/apache/cassandra/distributed/test/PartitionDenylistTest.java b/test/distributed/org/apache/cassandra/distributed/test/PartitionDenylistTest.java new file mode 100644 index 0000000..43b3ef8 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/PartitionDenylistTest.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test; + +import java.io.IOException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import com.google.common.util.concurrent.Uninterruptibles; +import org.junit.Assert; +import org.junit.Test; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.service.StorageProxy; +import org.apache.cassandra.service.StorageService; + +import static org.apache.cassandra.distributed.api.Feature.GOSSIP; +import static org.apache.cassandra.distributed.api.Feature.NETWORK; +import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis; + +public class PartitionDenylistTest extends TestBaseImpl +{ + private static final Logger logger = LoggerFactory.getLogger(PartitionDenylistTest.class); + private static final int testReplicationFactor = 3; + + // Create a four node cluster, populate with some denylist entries, stop all + // the nodes, then bring them up one by one, waiting for each node to complete + // startup before starting the next. + // + // On startup each node runs a SELECT * query on the partition denylist table + // to populate the cache. The whole keyspace is unlikely to be available until + // three of the four nodes are started, so the early nodes will go through several + // cycles of failing to retrieve the partition denylist before succeeding. + // + // with({NETWORK,GOSSIP} is currently required for in-JVM dtests to create + // the distributed system tables. + @Test + public void checkStartupWithoutTriggeringUnavailable() throws IOException, InterruptedException, ExecutionException, TimeoutException + { + int nodeCount = 4; + System.setProperty("cassandra.ring_delay_ms", "5000"); // down from 30s default + System.setProperty("cassandra.consistent.rangemovement", "false"); + System.setProperty("cassandra.consistent.simultaneousmoves.allow", "true"); + try (Cluster cluster = Cluster.build(nodeCount) + .withConfig(config -> config + .with(NETWORK) + .with(GOSSIP) + .set("enable_partition_denylist", true) + .set("denylist_initial_load_retry_seconds", 1)) + .createWithoutStarting()) + { + cluster.forEach(i -> { + i.startup(); + i.runOnInstance(PartitionDenylistTest::waitUntilStarted); + }); + + // Do a cluster-wide no unavailables were recorded while the denylist was loaded. + cluster.forEach(i -> i.runOnInstance(PartitionDenylistTest::checkNoUnavailables)); + } + } + + static private void waitUntilStarted() + { + waitUntilStarted(60, TimeUnit.SECONDS); + } + + // To be called inside the instance with runOnInstance + static private void waitUntilStarted(int waitDuration, TimeUnit waitUnits) + { + long deadlineInMillis = currentTimeMillis() + Math.max(1, waitUnits.toMillis(waitDuration)); + while (!StorageService.instance.getOperationMode().equals("NORMAL")) + { + if (currentTimeMillis() >= deadlineInMillis) + { + throw new RuntimeException("Instance did not reach application state NORMAL before timeout"); + } + Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS); + } + } + + // To be called inside the instance with runOnInstance + static private void checkNoUnavailables() + { + long deadlineInMillis = currentTimeMillis() + TimeUnit.SECONDS.toMillis(30); + + while (currentTimeMillis() < deadlineInMillis && + StorageProxy.instance.getPartitionDenylistLoadSuccesses() == 0) + { + Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); + } + + Assert.assertTrue("Partition denylist must have loaded before checking unavailables", + StorageProxy.instance.getPartitionDenylistLoadSuccesses() > 0); + } + + // To be called inside the instance with runOnInstance, no nodes are started/stopped + // and not enough nodes are available to succeed, so it should just retry a few times + static private void checkTimerActive() + { + long deadlineInMillis = currentTimeMillis() + TimeUnit.SECONDS.toMillis(30); + + do + { + // Make sure at least two load attempts have happened, + // in case we received a node up event about this node + if (StorageProxy.instance.getPartitionDenylistLoadAttempts() > 2) + { + return; + } + Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); + } while (currentTimeMillis() < deadlineInMillis); + + Assert.fail("Node did not retry loading on timeout in 30s"); + } + + @Test + public void checkTimerRetriesLoad() throws IOException + { + int nodeCount = 3; + + try (Cluster cluster = Cluster.build(nodeCount) + .withConfig(config -> config + .with(NETWORK) + .with(GOSSIP) + .set("enable_partition_denylist", true) + .set("denylist_initial_load_retry_seconds", 1)) + .createWithoutStarting()) + { + // Starting without networking enabled in the hope it doesn't trigger + // node lifecycle events when nodes start up. + cluster.get(1).startup(); + cluster.get(1).runOnInstance(PartitionDenylistTest::checkTimerActive); + } + } +} diff --git a/test/distributed/org/apache/cassandra/distributed/test/metric/TableMetricTest.java b/test/distributed/org/apache/cassandra/distributed/test/metric/TableMetricTest.java index 3997e4a..0e915d3 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/metric/TableMetricTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/metric/TableMetricTest.java @@ -41,7 +41,7 @@ import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.distributed.Cluster; import org.apache.cassandra.distributed.api.IInvokableInstance; import org.apache.cassandra.distributed.test.TestBaseImpl; -import org.apache.cassandra.repair.SystemDistributedKeyspace; +import org.apache.cassandra.schema.SystemDistributedKeyspace; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.SchemaKeyspace; import org.apache.cassandra.tracing.TraceKeyspace; diff --git a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java index 3478fb3..20fc8b5 100644 --- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java +++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java @@ -129,6 +129,7 @@ public class DatabaseDescriptorRefTest "org.apache.cassandra.exceptions.ConfigurationException", "org.apache.cassandra.exceptions.RequestValidationException", "org.apache.cassandra.exceptions.CassandraException", + "org.apache.cassandra.exceptions.InvalidRequestException", "org.apache.cassandra.exceptions.TransportException", "org.apache.cassandra.fql.FullQueryLogger", "org.apache.cassandra.fql.FullQueryLoggerOptions", @@ -152,12 +153,14 @@ public class DatabaseDescriptorRefTest "org.apache.cassandra.io.util.SpinningDiskOptimizationStrategy", "org.apache.cassandra.io.util.PathUtils$IOToLongFunction", "org.apache.cassandra.locator.Replica", + "org.apache.cassandra.locator.ReplicaCollection", "org.apache.cassandra.locator.SimpleSeedProvider", "org.apache.cassandra.locator.SeedProvider", "org.apache.cassandra.security.ISslContextFactory", "org.apache.cassandra.security.SSLFactory", "org.apache.cassandra.security.EncryptionContext", "org.apache.cassandra.service.CacheService$CacheType", + "org.apache.cassandra.transport.ProtocolException", "org.apache.cassandra.utils.binlog.BinLogOptions", "org.apache.cassandra.utils.FBUtilities", "org.apache.cassandra.utils.FBUtilities$1", diff --git a/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java b/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java index 065f557..d389a6c 100644 --- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java +++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java @@ -25,6 +25,7 @@ import java.net.NetworkInterface; import java.util.Arrays; import java.util.Collection; import java.util.Enumeration; +import java.util.function.Consumer; import com.google.common.base.Throwables; @@ -37,6 +38,7 @@ import org.apache.cassandra.exceptions.ConfigurationException; import org.assertj.core.api.Assertions; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -592,6 +594,26 @@ public class DatabaseDescriptorTest Assert.assertEquals(1, DatabaseDescriptor.tokensFromString(config.initial_token).size()); } + @Test + public void testDenylistInvalidValuesRejected() + { + DatabaseDescriptor.loadConfig(); + + expectIllegalArgumentException(DatabaseDescriptor::setDenylistRefreshSeconds, 0, "denylist_refresh_seconds must be a positive integer."); + expectIllegalArgumentException(DatabaseDescriptor::setDenylistRefreshSeconds, -1, "denylist_refresh_seconds must be a positive integer."); + expectIllegalArgumentException(DatabaseDescriptor::setDenylistMaxKeysPerTable, 0, "denylist_max_keys_per_table must be a positive integer."); + expectIllegalArgumentException(DatabaseDescriptor::setDenylistMaxKeysPerTable, -1, "denylist_max_keys_per_table must be a positive integer."); + expectIllegalArgumentException(DatabaseDescriptor::setDenylistMaxKeysTotal, 0, "denylist_max_keys_total must be a positive integer."); + expectIllegalArgumentException(DatabaseDescriptor::setDenylistMaxKeysTotal, -1, "denylist_max_keys_total must be a positive integer."); + } + + private void expectIllegalArgumentException(Consumer<Integer> c, int val, String expectedMessage) + { + assertThatThrownBy(() -> c.accept(val)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining(expectedMessage); + } + // coordinator read @Test public void testClientLargeReadWarnAndAbortNegative() diff --git a/test/unit/org/apache/cassandra/service/PartitionDenylistTest.java b/test/unit/org/apache/cassandra/service/PartitionDenylistTest.java new file mode 100644 index 0000000..0751036 --- /dev/null +++ b/test/unit/org/apache/cassandra/service/PartitionDenylistTest.java @@ -0,0 +1,495 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.cql3.UntypedResultSet; +import org.apache.cassandra.cql3.statements.schema.CreateTableStatement; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.schema.KeyspaceMetadata; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.Tables; + +import static org.apache.cassandra.cql3.QueryProcessor.process; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +public class PartitionDenylistTest +{ + private final static String ks_cql = "partition_denylist_keyspace"; + + @BeforeClass + public static void init() + { + CQLTester.prepareServer(); + + KeyspaceMetadata schema = KeyspaceMetadata.create(ks_cql, + KeyspaceParams.simple(1), + Tables.of( + CreateTableStatement.parse("CREATE TABLE table1 (" + + "keyone text, " + + "keytwo text, " + + "qux text, " + + "quz text, " + + "foo text, " + + "PRIMARY KEY((keyone, keytwo), qux, quz) ) ", ks_cql).build(), + CreateTableStatement.parse("CREATE TABLE table2 (" + + "keyone text, " + + "keytwo text, " + + "keythree text, " + + "value text, " + + "PRIMARY KEY((keyone, keytwo), keythree) ) ", ks_cql).build(), + CreateTableStatement.parse("CREATE TABLE table3 (" + + "keyone text, " + + "keytwo text, " + + "keythree text, " + + "value text, " + + "PRIMARY KEY((keyone, keytwo), keythree) ) ", ks_cql).build() + )); + Schema.instance.load(schema); + DatabaseDescriptor.setEnablePartitionDenylist(true); + DatabaseDescriptor.setEnableDenylistRangeReads(true); + DatabaseDescriptor.setDenylistConsistencyLevel(ConsistencyLevel.ONE); + DatabaseDescriptor.setDenylistRefreshSeconds(1); + StorageService.instance.initServer(0); + } + + @Before + public void setup() + { + DatabaseDescriptor.setEnablePartitionDenylist(true); + resetDenylist(); + + process("INSERT INTO " + ks_cql + ".table1 (keyone, keytwo, qux, quz, foo) VALUES ('aaa', 'bbb', 'ccc', 'ddd', 'v')", ConsistencyLevel.ONE); + process("INSERT INTO " + ks_cql + ".table1 (keyone, keytwo, qux, quz, foo) VALUES ('bbb', 'ccc', 'ccc', 'ddd', 'v')", ConsistencyLevel.ONE); + process("INSERT INTO " + ks_cql + ".table1 (keyone, keytwo, qux, quz, foo) VALUES ('ccc', 'ddd', 'ccc', 'ddd', 'v')", ConsistencyLevel.ONE); + process("INSERT INTO " + ks_cql + ".table1 (keyone, keytwo, qux, quz, foo) VALUES ('ddd', 'eee', 'ccc', 'ddd', 'v')", ConsistencyLevel.ONE); + process("INSERT INTO " + ks_cql + ".table1 (keyone, keytwo, qux, quz, foo) VALUES ('eee', 'fff', 'ccc', 'ddd', 'v')", ConsistencyLevel.ONE); + process("INSERT INTO " + ks_cql + ".table1 (keyone, keytwo, qux, quz, foo) VALUES ('fff', 'ggg', 'ccc', 'ddd', 'v')", ConsistencyLevel.ONE); + process("INSERT INTO " + ks_cql + ".table1 (keyone, keytwo, qux, quz, foo) VALUES ('ggg', 'hhh', 'ccc', 'ddd', 'v')", ConsistencyLevel.ONE); + process("INSERT INTO " + ks_cql + ".table1 (keyone, keytwo, qux, quz, foo) VALUES ('hhh', 'iii', 'ccc', 'ddd', 'v')", ConsistencyLevel.ONE); + process("INSERT INTO " + ks_cql + ".table1 (keyone, keytwo, qux, quz, foo) VALUES ('iii', 'jjj', 'ccc', 'ddd', 'v')", ConsistencyLevel.ONE); + process("INSERT INTO " + ks_cql + ".table1 (keyone, keytwo, qux, quz, foo) VALUES ('jjj', 'kkk', 'ccc', 'ddd', 'v')", ConsistencyLevel.ONE); + + + for (int i = 0; i < 50; i++) + process(String.format("INSERT INTO " + ks_cql + ".table2 (keyone, keytwo, keythree, value) VALUES ('%d', '%d', '%d', '%d')", i, i, i, i), ConsistencyLevel.ONE); + + for (int i = 0; i < 50; i++) + process(String.format("INSERT INTO " + ks_cql + ".table3 (keyone, keytwo, keythree, value) VALUES ('%d', '%d', '%d', '%d')", i, i, i, i), ConsistencyLevel.ONE); + + denylist("table1", "bbb:ccc"); + refreshList(); + } + + + private static void denylist(String table, final String key) + { + StorageProxy.instance.denylistKey(ks_cql, table, key); + } + + private static void refreshList() + { + StorageProxy.instance.loadPartitionDenylist(); + } + + /** + * @return Whether the *attempt* to remove the denylisted key and refresh succeeded. Doesn't necessarily indicate the key + * was previously blocked and found. + */ + private static boolean removeDenylist(final String ks, final String table, final String key) + { + return StorageProxy.instance.removeDenylistKey(ks, table, key); + } + + @Test + public void testRead() + { + process("SELECT * FROM " + ks_cql + ".table1 WHERE keyone='aaa' and keytwo='bbb'", ConsistencyLevel.ONE); + } + + @Test + public void testReadDenylisted() + { + assertThatThrownBy(() -> process("SELECT * FROM " + ks_cql + ".table1 WHERE keyone='bbb' and keytwo='ccc'", ConsistencyLevel.ONE)) + .isInstanceOf(InvalidRequestException.class) + .hasMessageContaining("Unable to read denylisted partition"); + } + + @Test + public void testIsKeyDenylistedAPI() + { + Assert.assertTrue(StorageProxy.instance.isKeyDenylisted(ks_cql, "table1", "bbb:ccc")); + resetDenylist(); + Assert.assertFalse(StorageProxy.instance.isKeyDenylisted(ks_cql, "table1", "bbb:ccc")); + + // Confirm an add mutates cache state + denylist("table1", "bbb:ccc"); + Assert.assertTrue(StorageProxy.instance.isKeyDenylisted(ks_cql, "table1", "bbb:ccc")); + + // Confirm removal then mutates cache w/out explicit reload + StorageProxy.instance.removeDenylistKey(ks_cql, "table1", "bbb:ccc"); + Assert.assertFalse(StorageProxy.instance.isKeyDenylisted(ks_cql, "table1", "bbb:ccc")); + } + + @Test + public void testReadUndenylisted() + { + resetDenylist(); + process("SELECT * FROM " + ks_cql + ".table1 WHERE keyone='ccc' and keytwo='ddd'", ConsistencyLevel.ONE); + } + + @Test + public void testWrite() + { + process("INSERT INTO " + ks_cql + ".table1 (keyone, keytwo, qux, quz, foo) VALUES ('eee', 'fff', 'ccc', 'ddd', 'v')", ConsistencyLevel.ONE); + process("DELETE FROM " + ks_cql + ".table1 WHERE keyone='eee' and keytwo='fff'", ConsistencyLevel.ONE); + } + + @Test + public void testWriteDenylisted() + { + assertThatThrownBy(() -> process("INSERT INTO " + ks_cql + ".table1 (keyone, keytwo, qux, quz, foo) VALUES ('bbb', 'ccc', 'eee', 'fff', 'w')", ConsistencyLevel.ONE)) + .isInstanceOf(InvalidRequestException.class) + .hasMessageContaining("Unable to write to denylisted partition"); + } + + @Test + public void testCASWriteDenylisted() + { + assertThatThrownBy(() -> process("UPDATE " + ks_cql + ".table1 SET foo='w' WHERE keyone='bbb' AND keytwo='ccc' AND qux='eee' AND quz='fff' IF foo='v'", ConsistencyLevel.LOCAL_SERIAL)) + .isInstanceOf(InvalidRequestException.class) + .hasMessageContaining("Unable to CAS write to denylisted partition"); + } + + @Test + public void testWriteUndenylisted() + { + resetDenylist(); + process("INSERT INTO " + ks_cql + ".table1 (keyone, keytwo, qux, quz, foo) VALUES ('bbb', 'ccc', 'eee', 'fff', 'w')", ConsistencyLevel.ONE); + } + + @Test + public void testRangeSlice() + { + UntypedResultSet rows; + rows = process("SELECT * FROM " + ks_cql + ".table1 WHERE token(keyone, keytwo) < token('bbb', 'ccc')", ConsistencyLevel.ONE); + Assert.assertEquals(1, rows.size()); + + // 10 entries total in our table + rows = process("SELECT * FROM " + ks_cql + ".table1 WHERE token(keyone, keytwo) > token('bbb', 'ccc')", ConsistencyLevel.ONE); + Assert.assertEquals(8, rows.size()); + + rows = process("SELECT * FROM " + ks_cql + ".table1 WHERE token(keyone, keytwo) >= token('aaa', 'bbb') and token(keyone, keytwo) < token('bbb', 'ccc')", ConsistencyLevel.ONE); + Assert.assertEquals(1, rows.size()); + + rows = process("SELECT * FROM " + ks_cql + ".table1 WHERE token(keyone, keytwo) > token('bbb', 'ccc') and token(keyone, keytwo) <= token('ddd', 'eee')", ConsistencyLevel.ONE); + Assert.assertEquals(2, rows.size()); + } + + @Test + public void testRangeDenylisted() + { + assertThatThrownBy(() -> process("SELECT * FROM " + ks_cql + ".table1", ConsistencyLevel.ONE)) + .isInstanceOf(InvalidRequestException.class) + .hasMessageContaining("Attempted to read a range containing 1 denylisted keys"); + } + + @Test + public void testRangeDenylisted2() + { + assertThatThrownBy(() -> process("SELECT * FROM " + ks_cql + ".table1 WHERE token(keyone, keytwo) >= token('aaa', 'bbb') and token (keyone, keytwo) <= token('bbb', 'ccc')", ConsistencyLevel.ONE)) + .isInstanceOf(InvalidRequestException.class) + .hasMessageContaining("Attempted to read a range containing 1 denylisted keys"); + } + + @Test + public void testRangeDenylisted3() + { + assertThatThrownBy(() -> process("SELECT * FROM " + ks_cql + ".table1 WHERE token(keyone, keytwo) >= token('bbb', 'ccc') and token (keyone, keytwo) <= token('ccc', 'ddd')", ConsistencyLevel.ONE)) + .isInstanceOf(InvalidRequestException.class) + .hasMessageContaining("Attempted to read a range containing 1 denylisted keys"); + } + + @Test + public void testRangeDenylisted4() + { + assertThatThrownBy(() -> process("SELECT * FROM " + ks_cql + ".table1 WHERE token(keyone, keytwo) > token('aaa', 'bbb') and token (keyone, keytwo) < token('ccc', 'ddd')", ConsistencyLevel.ONE)) + .isInstanceOf(InvalidRequestException.class) + .hasMessageContaining("Attempted to read a range containing 1 denylisted keys"); + } + + @Test + public void testRangeDenylisted5() + { + assertThatThrownBy(() -> process("SELECT * FROM " + ks_cql + ".table1 WHERE token(keyone, keytwo) > token('aaa', 'bbb')", ConsistencyLevel.ONE)) + .isInstanceOf(InvalidRequestException.class) + .hasMessageContaining("Attempted to read a range containing 1 denylisted keys"); + } + + @Test + public void testRangeDenylisted6() + { + assertThatThrownBy(() -> process("SELECT * FROM " + ks_cql + ".table1 WHERE token(keyone, keytwo) < token('ddd', 'eee')", ConsistencyLevel.ONE)) + .isInstanceOf(InvalidRequestException.class) + .hasMessageContaining("Attempted to read a range containing 1 denylisted keys"); + } + + @Test + public void testInsertUnknownPKIsGraceful() + { + Assert.assertTrue(StorageProxy.instance.denylistKey(ks_cql, "table1", "hohoho")); + } + + @Test + public void testInsertInvalidTableIsGraceful() + { + Assert.assertFalse(StorageProxy.instance.denylistKey(ks_cql, "asldkfjadlskjf", "alksdjfads")); + } + + @Test + public void testInsertInvalidKSIsGraceful() + { + Assert.assertFalse(StorageProxy.instance.denylistKey("asdklfjas", "asldkfjadlskjf", "alksdjfads")); + } + + @Test + public void testDisabledDenylistThrowsNoExceptions() + { + process(String.format("TRUNCATE TABLE %s.table2", ks_cql), ConsistencyLevel.ONE); + process(String.format("TRUNCATE TABLE %s.table3", ks_cql), ConsistencyLevel.ONE); + denyAllKeys(); + DatabaseDescriptor.setEnablePartitionDenylist(false); + process("INSERT INTO " + ks_cql + ".table1 (keyone, keytwo, qux, quz, foo) VALUES ('bbb', 'ccc', 'eee', 'fff', 'w')", ConsistencyLevel.ONE); + process("SELECT * FROM " + ks_cql + ".table1 WHERE keyone='bbb' and keytwo='ccc'", ConsistencyLevel.ONE); + process("SELECT * FROM " + ks_cql + ".table1", ConsistencyLevel.ONE); + + for (int i = 0; i < 50; i++) + { + process(String.format("INSERT INTO %s.table2 (keyone, keytwo, keythree, value) VALUES ('%s', '%s', '%s', '%s')", ks_cql, i, i, i, i), ConsistencyLevel.ONE); + process(String.format("SELECT * FROM %s.table2 WHERE keyone='%s' and keytwo='%s'", ks_cql, i, i), ConsistencyLevel.ONE); + } + + for (int i = 0; i < 50; i++) + { + process(String.format("INSERT INTO %s.table3 (keyone, keytwo, keythree, value) VALUES ('%s', '%s', '%s', '%s')", ks_cql, i, i, i, i), ConsistencyLevel.ONE); + process(String.format("SELECT * FROM %s.table3 WHERE keyone='%s' and keytwo='%s'", ks_cql, i, i), ConsistencyLevel.ONE); + } + } + + /** + * Want to make sure we don't throw anything or explode when people try to remove a key that's not there + */ + @Test + public void testRemoveMissingIsGraceful() + { + confirmDenied("table1", "bbb", "ccc"); + Assert.assertTrue(removeDenylist(ks_cql, "table1", "bbb:ccc")); + + // We expect this to silently not find and succeed at *trying* to remove it + Assert.assertTrue(removeDenylist(ks_cql, "table1", "bbb:ccc")); + refreshList(); + + confirmAllowed("table1", "bbb", "ccc"); + } + + /** + * We need to confirm that the entire cache is reloaded rather than being an additive change; we don't want keys to + * persist after their removal and reload from CQL. + */ + @Test + public void testRemoveWorksOnReload() + { + denyAllKeys(); + refreshList(); + + confirmDenied("table1", "aaa", "bbb"); + confirmDenied("table1", "eee", "fff"); + confirmDenied("table1", "iii", "jjj"); + + // poke a hole in the middle and reload + removeDenylist(ks_cql, "table1", "eee:fff"); + refreshList(); + + confirmAllowed("table1", "eee", "fff"); + confirmDenied("table1", "aaa", "bbb"); + confirmDenied("table1", "iii", "jjj"); + } + + /** + * We go through a few steps here: + * 1) Add more keys than we're allowed + * 2) Confirm that the overflow keys are *not* denied + * 3) Raise the allowable limit + * 4) Confirm that the overflow keys are now denied (and no longer really "overflow" for that matter) + */ + @Test + public void testShrinkAndGrow() + { + denyAllKeys(); + refreshList(); + + // Initial control; check denial of both initial and final keys + confirmDenied("table1", "aaa", "bbb"); + confirmDenied("table1", "iii", "jjj"); + + // Lower our limit to 5 allowable denies and then check and see that things past the limit are ignored + StorageProxy.instance.setDenylistMaxKeysPerTable(5); + StorageProxy.instance.setDenylistMaxKeysTotal(5); + refreshList(); + + // Confirm overflowed keys are allowed; first come first served + confirmDenied("table1", "aaa", "bbb"); + confirmAllowed("table1", "iii", "jjj"); + + // Now we raise the limit back up and do nothing else and confirm it's blocked + StorageProxy.instance.setDenylistMaxKeysPerTable(1000); + StorageProxy.instance.setDenylistMaxKeysTotal(1000); + refreshList(); + confirmDenied("table1", "aaa", "bbb"); + confirmDenied("table1", "iii", "jjj"); + + // Unblock via overflow the table 1 sentinel we'll check in a second + StorageProxy.instance.setDenylistMaxKeysPerTable(5); + StorageProxy.instance.setDenylistMaxKeysTotal(5); + refreshList(); + confirmAllowed("table1", "iii", "jjj"); + + // Now, we remove the denylist entries for our first 5, drop the limit back down, and confirm those overflowed keys now block + removeDenylist(ks_cql, "table1", "aaa:bbb"); + removeDenylist(ks_cql, "table1", "bbb:ccc"); + removeDenylist(ks_cql, "table1", "ccc:ddd"); + removeDenylist(ks_cql, "table1", "ddd:eee"); + removeDenylist(ks_cql, "table1", "eee:fff"); + refreshList(); + confirmDenied("table1", "iii", "jjj"); + } + + /* + We need to test that, during a violation of our global allowable limit, we still enforce our limit of keys queried + on a per-table basis. + */ + @Test + public void testTableLimitRespected() + { + StorageProxy.instance.setDenylistMaxKeysPerTable(5); + StorageProxy.instance.setDenylistMaxKeysTotal(12); + denyAllKeys(); + refreshList(); + + // Table 1: expect first 5 denied + confirmDenied("table1", "aaa", "bbb"); + confirmDenied("table1", "eee", "fff"); + confirmAllowed("table1", "fff", "ggg"); + + // Table 2: expect first 5 denied + for (int i = 0; i < 5; i++) + confirmDenied("table2", Integer.toString(i), Integer.toString(i)); + + // Confirm remainder are allowed because we hit our table limit at 5 + for (int i = 5; i < 50; i++) + confirmAllowed("table2", Integer.toString(i), Integer.toString(i)); + + // Table 3: expect only first 2 denied; global limit enforcement + confirmDenied("table3", "0", "0"); + confirmDenied("table3", "1", "1"); + + // And our final 48 should be allowed + for (int i = 2; i < 50; i++) + confirmAllowed("table3", Integer.toString(i), Integer.toString(i)); + } + + /** + * Test that we don't allow overflowing global limit due to accumulation of allowable table queries + */ + @Test + public void testGlobalLimitRespected() + { + StorageProxy.instance.setDenylistMaxKeysPerTable(50); + StorageProxy.instance.setDenylistMaxKeysTotal(15); + denyAllKeys(); + refreshList(); + + // Table 1: expect all 10 denied + confirmDenied("table1", "aaa", "bbb"); + confirmDenied("table1", "jjj", "kkk"); + + // Table 2: expect only 5 denied up to global limit trigger + for (int i = 0; i < 5; i++) + confirmDenied("table2", Integer.toString(i), Integer.toString(i)); + + // Remainder of Table 2 should be allowed; testing overflow boundary logic + for (int i = 5; i < 50; i++) + confirmAllowed("table2", Integer.toString(i), Integer.toString(i)); + + // Table 3: expect all allowed as we're past global limit by the time we got to this table load. This confirms that + // we bypass load completely on tables once we're at our global limit. + for (int i = 0; i < 50; i++) + confirmAllowed("table3", Integer.toString(i), Integer.toString(i)); + } + + private void confirmDenied(String table, String keyOne, String keyTwo) + { + String query = String.format("SELECT * FROM " + ks_cql + "." + table + " WHERE keyone='%s' and keytwo='%s'", keyOne, keyTwo); + assertThatThrownBy(() -> process(query, ConsistencyLevel.ONE)) + .isInstanceOf(InvalidRequestException.class) + .hasMessageContaining("Unable to read denylisted partition"); + } + + private void confirmAllowed(String table, String keyOne, String keyTwo) + { + process(String.format("SELECT * FROM %s.%s WHERE keyone='%s' and keytwo='%s'", ks_cql, table, keyOne, keyTwo), ConsistencyLevel.ONE); + } + + private void resetDenylist() + { + process("TRUNCATE system_distributed.partition_denylist", ConsistencyLevel.ONE); + StorageProxy.instance.setDenylistMaxKeysTotal(1000); + StorageProxy.instance.setDenylistMaxKeysPerTable(1000); + StorageProxy.instance.loadPartitionDenylist(); + } + + private void denyAllKeys() + { + denylist("table1", "aaa:bbb"); + denylist("table1", "bbb:ccc"); + denylist("table1", "ccc:ddd"); + denylist("table1", "ddd:eee"); + denylist("table1", "eee:fff"); + denylist("table1", "fff:ggg"); + denylist("table1", "ggg:hhh"); + denylist("table1", "hhh:iii"); + denylist("table1", "iii:jjj"); + denylist("table1", "jjj:kkk"); + + for (int i = 0; i < 50; i++) + { + denylist("table2", String.format("%d:%d", i, i)); + denylist("table3", String.format("%d:%d", i, i)); + } + + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org