This is an automated email from the ASF dual-hosted git repository.

jmckenzie pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ab920c3  Add a Denylist to block reads and writes on specific 
partition keys
ab920c3 is described below

commit ab920c30310a8c095ba76b363142b8e74cbf0a0a
Author: Josh McKenzie <jmcken...@apache.org>
AuthorDate: Fri Sep 17 16:34:04 2021 -0400

    Add a Denylist to block reads and writes on specific partition keys
    
    Patch by Josh McKenzie, reviewed by Aleksei Zotov and Sumanth Pasupuleti 
for CASSANDRA-12106
    
    Co-authored by Josh McKenzie <jmcken...@apache.org>
    Co-authored by Sam Overton
---
 conf/cassandra.yaml                                |  30 ++
 doc/source/operating/denylisting_partitions.rst    | 110 +++++
 doc/source/operating/index.rst                     |   1 +
 src/java/org/apache/cassandra/config/Config.java   |  32 ++
 .../cassandra/config/DatabaseDescriptor.java       |  98 ++++
 .../org/apache/cassandra/db/view/ViewBuilder.java  |   2 +-
 .../org/apache/cassandra/db/view/ViewManager.java  |   2 +-
 .../apache/cassandra/metrics/DenylistMetrics.java  |  58 +++
 .../org/apache/cassandra/repair/RepairJob.java     |   1 +
 .../apache/cassandra/repair/RepairRunnable.java    |  20 +-
 .../org/apache/cassandra/repair/RepairSession.java |   1 +
 .../apache/cassandra/schema/PartitionDenylist.java | 535 +++++++++++++++++++++
 .../apache/cassandra/schema/SchemaConstants.java   |   3 +
 .../SystemDistributedKeyspace.java                 |  33 +-
 .../org/apache/cassandra/service/StorageProxy.java | 185 ++++++-
 .../cassandra/service/StorageProxyMBean.java       |  14 +
 .../apache/cassandra/service/StorageService.java   |   3 +
 .../service/reads/range/RangeCommands.java         |  26 +
 .../distributed/test/GossipSettlesTest.java        |   2 +-
 .../distributed/test/PartitionDenylistTest.java    | 155 ++++++
 .../distributed/test/metric/TableMetricTest.java   |   2 +-
 .../config/DatabaseDescriptorRefTest.java          |   3 +
 .../cassandra/config/DatabaseDescriptorTest.java   |  22 +
 .../cassandra/service/PartitionDenylistTest.java   | 495 +++++++++++++++++++
 24 files changed, 1800 insertions(+), 33 deletions(-)

diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 87df25a..65eb385 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -1026,6 +1026,36 @@ slow_query_log_timeout_in_ms: 500
 # bound (for example a few nodes with big files).
 # streaming_connections_per_host: 1
 
+# Allows denying configurable access (rw/rr) to operations on configured ks, 
table, and partitions, intended for use by
+# operators to manage cluster health vs application access. See 
CASSANDRA-12106 and CEP-13 for more details.
+# enable_partition_denylist = false;
+
+# enable_denylist_writes = true;
+# enable_denylist_reads = true;
+# enable_denylist_range_reads = true;
+
+# The interval at which keys in the cache for denylisting will "expire" and 
async refresh from the backing DB.
+# Note: this serves only as a fail-safe, as the usage pattern is expected to 
be "mutate state, refresh cache" on any
+# changes to the underlying denylist entries. See documentation for details.
+# denylist_refresh_seconds = 600;
+
+# In the event of errors on attempting to load the denylist cache, retry on 
this interval.
+# denylist_initial_load_retry_seconds = 5;
+
+# We cap the number of denylisted keys allowed per table to keep things from 
growing unbounded. Nodes will warn above
+# this limit while allowing new denylisted keys to be inserted. Denied keys 
are loaded in natural query / clustering
+# ordering by partition key in case of overflow.
+# denylist_max_keys_per_table = 1000;
+
+# We cap the total number of denylisted keys allowed in the cluster to keep 
things from growing unbounded.
+# Nodes will warn on initial cache load that there are too many keys and be 
direct the operator to trim down excess
+# entries to within the configured limits.
+# denylist_max_keys_total = 10000;
+
+# Since the denylist in many ways serves to protect the health of the cluster 
from partitions operators have identified
+# as being in a bad state, we usually want more robustness than just CL.ONE on 
operations to/from these tables to
+# ensure that these safeguards are in place. That said, we allow users to 
configure this if they're so inclined.
+# denylist_consistency_level = ConsistencyLevel.QUORUM;
 
 # phi value that must be reached for a host to be marked down.
 # most users should never need to adjust this.
diff --git a/doc/source/operating/denylisting_partitions.rst 
b/doc/source/operating/denylisting_partitions.rst
new file mode 100644
index 0000000..3e70f2d
--- /dev/null
+++ b/doc/source/operating/denylisting_partitions.rst
@@ -0,0 +1,110 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+.. or more contributor license agreements.  See the NOTICE file
+.. distributed with this work for additional information
+.. regarding copyright ownership.  The ASF licenses this file
+.. to you under the Apache License, Version 2.0 (the
+.. "License"); you may not use this file except in compliance
+.. with the License.  You may obtain a copy of the License at
+..
+..     http://www.apache.org/licenses/LICENSE-2.0
+..
+.. Unless required by applicable law or agreed to in writing, software
+.. distributed under the License is distributed on an "AS IS" BASIS,
+.. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+.. See the License for the specific language governing permissions and
+.. limitations under the License.
+
+Denylisting Partitions
+----------------------
+
+Due to access patterns and data modeling, sometimes there are specific 
partitions that are "hot" and can cause instability in a Cassandra cluster. 
This often occurs when your data model includes many update or insert 
operations on a single partition, causing the partition to grow very large over 
time and in turn making it very expensive to read and maintain.
+
+Cassandra supports "denylisting" these problematic partitions so that when 
clients issue point reads (`SELECT` statements with the partition key 
specified) or range reads (`SELECT *`, etc that pull a range of data) that 
intersect with a blocked partition key, the query will be immediately rejected 
with an `InvalidQueryException`.
+
+How to denylist a partition key
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+The ``system_distributed.denylisted_partitions`` table can be used to denylist 
partitions. There are a couple of ways to interact with and mutate this data. 
First: directly via CQL by inserting a record with the following details:
+
+- Keyspace name (ks_name)
+- Table name (table_name)
+- Partition Key (partition_key)
+
+The partition key format needs to be in the same form required by ``nodetool 
getendpoints``.
+
+Following are several examples for denylisting partition keys in keyspace `ks` 
and table `table1` for different data types on the primary key `Id`:
+
+ - Id is a simple type - INSERT INTO system_distributed.denylisted_partitions 
(ks_name, table_name, partition_key) VALUES ('ks','table1','1');
+ - Id is a blob        - INSERT INTO system_distributed.denylisted_partitions 
(ks_name, table_name, partition_key) VALUES ('ks','table1','12345f');
+ - Id has a colon      - INSERT INTO system_distributed.denylisted_partitions 
(ks_name, table_name, partition_key) VALUES ('ks','table1','1\:2');
+
+In the case of composite column partition keys (Key1, Key2):
+
+ - INSERT INTO system_distributed.denylisted_partitions (ks_name, table_name, 
partition_key) VALUES ('ks', 'table1', 'k11:k21')
+
+Special considerations
+^^^^^^^^^^^^^^^^^^^^^^
+The denylist has the property in that you want to keep your cache (see below) 
and CQL data on a replica set as close together as possible so you don't have 
different nodes in your cluster denying or allowing different keys. To best 
achieve this, the workflow for a denylist change (addition or deletion) should 
`always be as follows`:
+
+JMX PATH (preferred for single changes):
+
+1. Call the JMX hook for ``denylistKey()`` with the desired key
+2. Double check the cache reloaded with ``isKeyDenylisted()``
+3. Check for warnings about unrecognized keyspace/table combinations, limits, 
or consistency level. If you get a message about nodes being down and not 
hitting CL for denylist, recover the downed nodes and then trigger a reload of 
the cache on each node with ``loadPartitionDenylist()``
+
+CQL PATH (preferred for bulk changes):
+
+1. Mutate the denylisted partition lists via CQL
+2. Trigger a reload of the denylist cache on each node via JMX 
``loadPartitionDenylist()`` (see below)
+3. Check for warnings about lack of availability for a denylist refresh. In 
the event nodes are down, recover them, then go to 2.
+
+Due to conditions on known unavailable range slices leading to alert storming 
on startup, the denylist cache won't load on node start unless it can achieve 
the configured consistency level in cassandra.yaml, 
`denylist_consistency_level`. The JMX call to `loadPartitionDenylist` will, 
however, load the cache regardless of the number of nodes available. This 
leaves the control for denylisting or not denylisting during degraded cluster 
states in the hands of the operator.
+
+Denylisted Partitions Cache
+^^^^^^^^^^^^^^^^^^^^^^^^^^^
+Cassandra internally maintains an on-heap cache of denylisted partitions 
loaded from ``system_distributed.denylisted_partitions``. The values for a 
table will be automatically repopulated every ``denylist_refresh_seconds`` as 
specified in the `conf/cassandra.yaml` file, defaulting to 600 seconds, or 10 
minutes. Invalid records (unknown keyspaces, tables, or keys) will be ignored 
and not cached on load.
+
+The cache can be refreshed in the following ways:
+
+- During Cassandra node startup
+- Via the automatic on-heap cache refresh mechanisms. Note: this will occur 
asynchronously on query after the ``denylist_refresh_seconds`` time is hit.
+- Via the JMX command: ``loadPartitionDenylist`` in ``the 
org.apache.cassandra.service.StorageProxyMBean`` invocation point.
+
+The Cache size is bounded by the following two config properties
+
+- denylist_max_keys_per_table
+- denylist_max_keys_total
+
+On cache load, if a table exceeds the value allowed in 
`denylist_max_keys_per_table`, a warning will be printed to the logs and the 
remainder of the keys will not be cached. Similarly, if the total allowable 
size is exceeded subsequent ks_name + table_name combinations (in clustering / 
lexicographical order) will be skipped as well, and a warning logged to the 
server logs.
+
+Note: given the required workflow of 1) Mutate, 2) Reload cache, the 
auto-reload property seems superfluous. It exists to ensure that, should an 
operator make a mistake and denylist (or undenylist) a key but forget to reload 
the cache, that intent will be captured on the next cache reload.
+
+JMX Interface
+^^^^^^^^^^^^^
+
++----------------------------------------------------------------------------+---------------------------------------------------------------------------------+
+| Command                                                                    | 
Effect                                                                          
|
++============================================================================+=================================================================================+
+| loadPartitionDenylist()                                                    | 
Reloads cached denylist from CQL table                                          
|
++----------------------------------------------------------------------------+---------------------------------------------------------------------------------+
+| getPartitionDenylistLoadAttempts()                                         | 
Gets the count of cache reload attempts                                         
|
++----------------------------------------------------------------------------+---------------------------------------------------------------------------------+
+| getPartitionDenylistLoadSuccesses()                                        | 
Gets the count of cache reload successes                                        
|
++----------------------------------------------------------------------------+---------------------------------------------------------------------------------+
+| setEnablePartitionDenylist(boolean enabled)                                | 
Enables or disables the partition denylisting functionality                     
|
++----------------------------------------------------------------------------+---------------------------------------------------------------------------------+
+| setEnableDenylistWrites(boolean enabled)                                   | 
Enables or disables write denylisting functionality                             
|
++----------------------------------------------------------------------------+---------------------------------------------------------------------------------+
+| setEnableDenylistReads(boolean enabled)                                    | 
Enables or disables read denylisting functionality                              
|
++----------------------------------------------------------------------------+---------------------------------------------------------------------------------+
+| setEnableDenylistRangeReads(boolean enabled)                               | 
Enables or disables range read denylisting functionality                        
|
++----------------------------------------------------------------------------+---------------------------------------------------------------------------------+
+| denylistKey(String keyspace, String table, String partitionKeyAsString)    | 
Adds a specific keyspace, table, and partition key combo to the denylist        
|
++----------------------------------------------------------------------------+---------------------------------------------------------------------------------+
+| removeDenylistKey(String keyspace, String cf, String partitionKeyAsString) | 
Removes a specific keyspace, table, and partition key combo from the denylist   
|
++----------------------------------------------------------------------------+---------------------------------------------------------------------------------+
+| setDenylistMaxKeysPerTable(int value)                                      | 
Limits count of allowed keys per table in the denylist                          
|
++----------------------------------------------------------------------------+---------------------------------------------------------------------------------+
+| setDenylistMaxKeysTotal(int value)                                         | 
Limits the total count of allowable denylisted keys in the system               
|
++----------------------------------------------------------------------------+---------------------------------------------------------------------------------+
+| isKeyDenylisted(String keyspace, String table, String partitionKeyAsString)| 
Indicates whether the keyspace.table has the input partition key denied         
|
++----------------------------------------------------------------------------+---------------------------------------------------------------------------------+
diff --git a/doc/source/operating/index.rst b/doc/source/operating/index.rst
index 78c7eb6..ed2c71a 100644
--- a/doc/source/operating/index.rst
+++ b/doc/source/operating/index.rst
@@ -36,4 +36,5 @@ Operating Cassandra
    metrics
    security
    hardware
+   denylisting_partitions
 
diff --git a/src/java/org/apache/cassandra/config/Config.java 
b/src/java/org/apache/cassandra/config/Config.java
index b9b5975..0dc4180 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -532,6 +532,38 @@ public class Config
     public volatile String auth_read_consistency_level = "LOCAL_QUORUM";
     public volatile String auth_write_consistency_level = "EACH_QUORUM";
 
+    /** This feature allows denying access to operations on certain key 
partitions, intended for use by operators to
+     * provide another tool to manage cluster health vs application access. 
See CASSANDRA-12106 and CEP-13 for more details.
+     */
+    public volatile boolean enable_partition_denylist = false;
+
+    public volatile boolean enable_denylist_writes = true;
+
+    public volatile boolean enable_denylist_reads = true;
+
+    public volatile boolean enable_denylist_range_reads = true;
+
+    public int denylist_refresh_seconds = 600;
+
+    public int denylist_initial_load_retry_seconds = 5;
+
+    /** We cap the number of denylisted keys allowed per table to keep things 
from growing unbounded. Operators will
+     * receive warnings and only denylist_max_keys_per_table in natural query 
ordering will be processed on overflow.
+     */
+    public volatile int denylist_max_keys_per_table = 1000;
+
+    /** We cap the total number of denylisted keys allowed in the cluster to 
keep things from growing unbounded.
+     * Operators will receive warnings on initial cache load that there are 
too many keys and be directed to trim
+     * down the entries to within the configured limits.
+     */
+    public volatile int denylist_max_keys_total = 10000;
+
+    /** Since the denylist in many ways serves to protect the health of the 
cluster from partitions operators have identified
+     * as being in a bad state, we usually want more robustness than just 
CL.ONE on operations to/from these tables to
+     * ensure that these safeguards are in place. That said, we allow users to 
configure this if they're so inclined.
+     */
+    public ConsistencyLevel denylist_consistency_level = 
ConsistencyLevel.QUORUM;
+
     /**
      * The intial capacity for creating RangeTombstoneList.
      */
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 7a3164a..d246fc7 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -3471,6 +3471,104 @@ public class DatabaseDescriptor
         conf.consecutive_message_errors_threshold = value;
     }
 
+    public static boolean getEnablePartitionDenylist()
+    {
+        return conf.enable_partition_denylist;
+    }
+
+    public static void setEnablePartitionDenylist(boolean enabled)
+    {
+        conf.enable_partition_denylist = enabled;
+    }
+
+    public static boolean getEnableDenylistWrites()
+    {
+        return conf.enable_denylist_writes;
+    }
+
+    public static void setEnableDenylistWrites(boolean enabled)
+    {
+        conf.enable_denylist_writes = enabled;
+    }
+
+    public static boolean getEnableDenylistReads()
+    {
+        return conf.enable_denylist_reads;
+    }
+
+    public static void setEnableDenylistReads(boolean enabled)
+    {
+        conf.enable_denylist_reads = enabled;
+    }
+
+    public static boolean getEnableDenylistRangeReads()
+    {
+        return conf.enable_denylist_range_reads;
+    }
+
+    public static void setEnableDenylistRangeReads(boolean enabled)
+    {
+        conf.enable_denylist_range_reads = enabled;
+    }
+
+    public static int getDenylistRefreshSeconds()
+    {
+        return conf.denylist_refresh_seconds;
+    }
+
+    public static void setDenylistRefreshSeconds(int seconds)
+    {
+        if (seconds <= 0)
+            throw new IllegalArgumentException("denylist_refresh_seconds must 
be a positive integer.");
+        conf.denylist_refresh_seconds = seconds;
+    }
+
+    public static int getDenylistInitialLoadRetrySeconds()
+    {
+        return conf.denylist_initial_load_retry_seconds;
+    }
+
+    public static void setDenylistInitialLoadRetrySeconds(int seconds)
+    {
+        if (seconds <= 0)
+            throw new 
IllegalArgumentException("denylist_initial_load_retry_seconds must be a 
positive integer.");
+        conf.denylist_initial_load_retry_seconds = seconds;
+    }
+
+    public static ConsistencyLevel getDenylistConsistencyLevel()
+    {
+        return conf.denylist_consistency_level;
+    }
+
+    public static void setDenylistConsistencyLevel(ConsistencyLevel cl)
+    {
+        conf.denylist_consistency_level = cl;
+    }
+
+    public static int getDenylistMaxKeysPerTable()
+    {
+        return conf.denylist_max_keys_per_table;
+    }
+
+    public static void setDenylistMaxKeysPerTable(int value)
+    {
+        if (value <= 0)
+            throw new IllegalArgumentException("denylist_max_keys_per_table 
must be a positive integer.");
+        conf.denylist_max_keys_per_table = value;
+    }
+
+    public static int getDenylistMaxKeysTotal()
+    {
+        return conf.denylist_max_keys_total;
+    }
+
+    public static void setDenylistMaxKeysTotal(int value)
+    {
+        if (value <= 0)
+            throw new IllegalArgumentException("denylist_max_keys_total must 
be a positive integer.");
+        conf.denylist_max_keys_total = value;
+    }
+
     public static SubnetGroups getClientErrorReportingExclusions()
     {
         return conf.client_error_reporting_exclusions;
diff --git a/src/java/org/apache/cassandra/db/view/ViewBuilder.java 
b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
index d087224..8c840e9 100644
--- a/src/java/org/apache/cassandra/db/view/ViewBuilder.java
+++ b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
@@ -41,7 +41,7 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.locator.RangesAtEndpoint;
 import org.apache.cassandra.locator.Replicas;
-import org.apache.cassandra.repair.SystemDistributedKeyspace;
+import org.apache.cassandra.schema.SystemDistributedKeyspace;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
diff --git a/src/java/org/apache/cassandra/db/view/ViewManager.java 
b/src/java/org/apache/cassandra/db/view/ViewManager.java
index 7e3ea1b..111f96a 100644
--- a/src/java/org/apache/cassandra/db/view/ViewManager.java
+++ b/src/java/org/apache/cassandra/db/view/ViewManager.java
@@ -32,7 +32,7 @@ import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.ViewMetadata;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.partitions.*;
-import org.apache.cassandra.repair.SystemDistributedKeyspace;
+import org.apache.cassandra.schema.SystemDistributedKeyspace;
 import org.apache.cassandra.schema.Views;
 import org.apache.cassandra.service.StorageService;
 
diff --git a/src/java/org/apache/cassandra/metrics/DenylistMetrics.java 
b/src/java/org/apache/cassandra/metrics/DenylistMetrics.java
new file mode 100644
index 0000000..0372787
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/DenylistMetrics.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.metrics;
+
+import com.codahale.metrics.Meter;
+
+import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
+
+public class DenylistMetrics
+{
+    private final Meter writesRejected;
+    private final Meter readsRejected;
+    private final Meter rangeReadsRejected;
+    private final Meter totalRequestsRejected;
+
+    public DenylistMetrics()
+    {
+        final MetricNameFactory factory = new 
DefaultNameFactory("StorageProxy", "PartitionDenylist");
+        writesRejected = 
Metrics.meter(factory.createMetricName("WriteRejected"));
+        readsRejected = 
Metrics.meter(factory.createMetricName("ReadRejected"));
+        rangeReadsRejected = 
Metrics.meter(factory.createMetricName("RangeReadRejected"));
+        totalRequestsRejected = 
Metrics.meter(factory.createMetricName("TotalRejected"));
+    }
+
+    public void incrementWritesRejected()
+    {
+        writesRejected.mark();
+        totalRequestsRejected.mark();
+    }
+
+    public void incrementReadsRejected()
+    {
+        readsRejected.mark();
+        totalRequestsRejected.mark();
+    }
+
+    public void incrementRangeReadsRejected()
+    {
+        rangeReadsRejected.mark();
+        totalRequestsRejected.mark();
+    }
+}
\ No newline at end of file
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java 
b/src/java/org/apache/cassandra/repair/RepairJob.java
index 1e203c4..a57ca84 100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -39,6 +39,7 @@ import org.apache.cassandra.repair.asymmetric.HostDifferences;
 import org.apache.cassandra.repair.asymmetric.PreferedNodeFilter;
 import org.apache.cassandra.repair.asymmetric.ReduceHelper;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.schema.SystemDistributedKeyspace;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.dht.Range;
diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java 
b/src/java/org/apache/cassandra/repair/RepairRunnable.java
index faa0a74..30244a7 100644
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@ -36,11 +36,6 @@ import org.slf4j.LoggerFactory;
 import com.codahale.metrics.Timer;
 import org.apache.cassandra.concurrent.ExecutorPlus;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.exceptions.RepairException;
-import org.apache.cassandra.metrics.RepairMetrics;
-import org.apache.cassandra.gms.FailureDetector;
-import org.apache.cassandra.repair.consistent.SyncStatSummary;
-import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;
@@ -50,6 +45,7 @@ import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.RepairException;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.locator.EndpointsForRange;
 import org.apache.cassandra.locator.InetAddressAndPort;
@@ -60,8 +56,11 @@ import 
org.apache.cassandra.repair.consistent.CoordinatorSession;
 import org.apache.cassandra.repair.consistent.SyncStatSummary;
 import org.apache.cassandra.repair.messages.RepairOption;
 import org.apache.cassandra.schema.SchemaConstants;
-import org.apache.cassandra.service.*;
+import org.apache.cassandra.schema.SystemDistributedKeyspace;
+import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.ActiveRepairService.ParentRepairStatus;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.tracing.TraceKeyspace;
 import org.apache.cassandra.tracing.TraceState;
@@ -73,19 +72,18 @@ import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Throwables;
 import org.apache.cassandra.utils.UUIDGen;
 import org.apache.cassandra.utils.WrappedRunnable;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.FutureCombiner;
+import org.apache.cassandra.utils.concurrent.ImmediateFuture;
 import org.apache.cassandra.utils.progress.ProgressEvent;
 import org.apache.cassandra.utils.progress.ProgressEventNotifier;
 import org.apache.cassandra.utils.progress.ProgressEventType;
 import org.apache.cassandra.utils.progress.ProgressListener;
 
+import static 
org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
 import static org.apache.cassandra.service.QueryState.forInternalCalls;
 import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
 import static org.apache.cassandra.utils.Clock.Global.nanoTime;
-import org.apache.cassandra.utils.concurrent.Future;
-import org.apache.cassandra.utils.concurrent.FutureCombiner;
-import org.apache.cassandra.utils.concurrent.ImmediateFuture;
-
-import static 
org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
 
 public class RepairRunnable implements Runnable, ProgressEventNotifier
 {
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java 
b/src/java/org/apache/cassandra/repair/RepairSession.java
index 1c7d6c9..2e2b36b 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -50,6 +50,7 @@ import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.repair.consistent.ConsistentSession;
 import org.apache.cassandra.repair.consistent.LocalSession;
 import org.apache.cassandra.repair.consistent.LocalSessions;
+import org.apache.cassandra.schema.SystemDistributedKeyspace;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.streaming.SessionSummary;
diff --git a/src/java/org/apache/cassandra/schema/PartitionDenylist.java 
b/src/java/org/apache/cassandra/schema/PartitionDenylist.java
new file mode 100644
index 0000000..c9aee97
--- /dev/null
+++ b/src/java/org/apache/cassandra/schema/PartitionDenylist.java
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.schema;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableSortedSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.benmanes.caffeine.cache.CacheLoader;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.reads.range.RangeCommands;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.NoSpamLogger;
+
+import static 
org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
+import static org.apache.cassandra.cql3.QueryProcessor.process;
+import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
+
+/**
+ * PartitionDenylist uses the system_distributed.partition_denylist table to 
maintain a list of denylisted partition keys
+ * for each keyspace/table.
+ *
+ * Keys can be entered manually into the partition_denylist table or via the 
JMX operation StorageProxyMBean.denylistKey
+ *
+ * The denylist is stored as one CQL partition per table, and the denylisted 
keys are column names in that partition. The denylisted
+ * keys for each table are cached in memory, and reloaded from the 
partition_denylist table every 10 minutes (default) or when the
+ * StorageProxyMBean.loadPartitionDenylist is called via JMX.
+ *
+ * Concurrency of the cache is provided by the concurrency semantics of the 
guava LoadingCache. All values (DenylistEntry) are
+ * immutable collections of keys/tokens which are replaced in whole when the 
cache refreshes from disk.
+ *
+ * The CL for the denylist is used on initial node load as well as on timer 
instigated cache refreshes. A JMX call by the
+ * operator to load the denylist cache will warn on CL unavailability but go 
through with the denylist load. This is to
+ * allow operators flexibility in the face of degraded cluster state and still 
grant them the ability to mutate the denylist
+ * cache and bring it up if there are things they need to block on startup.
+ *
+ * Notably, in the current design it's possible for a table *cache expiration 
instigated* reload to end up violating the
+ * contract on total denylisted keys allowed in the case where it initially 
loads with a value less than the DBD
+ * allowable max per table limit due to global constraint enforcement on 
initial load. Our load and reload function
+ * simply enforce the *per table* limit without consideration to what that 
entails at the global key level. While we
+ * could track the constrained state and count in DenylistEntry, for now the 
complexity doesn't seem to justify the
+ * protection against that edge case. The enforcement should take place on a 
user-instigated full reload as well as
+ * error messaging about count violations, so this only applies to situations 
in which someone adds a key and doesn't
+ * actively tell the cache to fully reload to take that key into 
consideration, which one could reasonably expect to be
+ * an antipattern.
+ */
+public class PartitionDenylist
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(PartitionDenylist.class);
+    private static final NoSpamLogger AVAILABILITY_LOGGER = 
NoSpamLogger.getLogger(logger, 1, TimeUnit.MINUTES);
+
+    private final ExecutorService executor = 
executorFactory().pooled("DenylistCache", 2);
+
+    /** We effectively don't use our initial empty cache to denylist until the 
{@link #load()} call which will replace it */
+    private volatile LoadingCache<TableId, DenylistEntry> denylist = 
buildEmptyCache();
+
+    /** Denylist entry is never mutated once constructed, only replaced with a 
new entry when the cache is refreshed */
+    private static class DenylistEntry
+    {
+        public final ImmutableSet<ByteBuffer> keys;
+        public final ImmutableSortedSet<Token> tokens;
+
+        public DenylistEntry()
+        {
+            keys = ImmutableSet.of();
+            tokens = ImmutableSortedSet.of();
+        }
+
+        public DenylistEntry(final ImmutableSet<ByteBuffer> keys, final 
ImmutableSortedSet<Token> tokens)
+        {
+            this.keys = keys;
+            this.tokens = tokens;
+        }
+    }
+
+    /** synchronized on this */
+    private int loadAttempts = 0;
+
+    /** synchronized on this */
+    private int loadSuccesses = 0;
+
+    public synchronized int getLoadAttempts()
+    {
+        return loadAttempts;
+    }
+    public synchronized int getLoadSuccesses()
+    {
+        return loadSuccesses;
+    }
+
+    /**
+     * Performs initial load of the partition denylist. Should be called at 
startup and only loads if the operation
+     * is expected to succeed.  If it is not possible to load at call time, a 
timer is set to retry.
+     */
+    public void initialLoad()
+    {
+        if (!DatabaseDescriptor.getEnablePartitionDenylist())
+            return;
+
+        synchronized (this)
+        {
+            loadAttempts++;
+        }
+
+        // Check if there are sufficient nodes to attempt reading all the 
denylist partitions before issuing the query.
+        // The pre-check prevents definite range-slice unavailables being 
marked and triggering an alert.  Nodes may still change
+        // state between the check and the query, but it should significantly 
reduce the alert volume.
+        String retryReason = "Insufficient nodes";
+        try
+        {
+            if (checkDenylistNodeAvailability())
+            {
+                load();
+                return;
+            }
+        }
+        catch (Throwable tr)
+        {
+            logger.error("Failed to load partition denylist", tr);
+            retryReason = "Exception";
+        }
+
+        // This path will also be taken on other failures other than 
UnavailableException,
+        // but seems like a good idea to retry anyway.
+        int retryInSeconds = 
DatabaseDescriptor.getDenylistInitialLoadRetrySeconds();
+        logger.info("{} while loading partition denylist cache. Scheduled 
retry in {} seconds.", retryReason, retryInSeconds);
+        ScheduledExecutors.optionalTasks.schedule(this::initialLoad, 
retryInSeconds, TimeUnit.SECONDS);
+    }
+
+    private boolean checkDenylistNodeAvailability()
+    {
+        boolean sufficientNodes = 
RangeCommands.sufficientLiveNodesForSelectStar(SystemDistributedKeyspace.PartitionDenylistTable,
+                                                                               
  DatabaseDescriptor.getDenylistConsistencyLevel());
+        if (!sufficientNodes)
+        {
+            AVAILABILITY_LOGGER.warn("Attempting to load denylist and not 
enough nodes are available for a {} refresh. Reload the denylist when 
unavailable nodes are recovered to ensure your denylist remains in sync.",
+                                     
DatabaseDescriptor.getDenylistConsistencyLevel());
+        }
+        return sufficientNodes;
+    }
+
+    /** Helper method as we need to both build cache on initial init but also 
on reload of cache contents and params */
+    private LoadingCache<TableId, DenylistEntry> buildEmptyCache()
+    {
+        // We rely on details of .refreshAfterWrite to reload this async in 
the background when it's hit:
+        // https://github.com/ben-manes/caffeine/wiki/Refresh
+        return Caffeine.newBuilder()
+                       
.refreshAfterWrite(DatabaseDescriptor.getDenylistRefreshSeconds(), 
TimeUnit.SECONDS)
+                       .executor(executor)
+                       .build(new CacheLoader<TableId, DenylistEntry>()
+                       {
+                           @Override
+                           public DenylistEntry load(final TableId tid)
+                           {
+                               // We load whether or not the CL required count 
are available as the alternative is an
+                               // empty denylist. This allows operators to 
intervene in the event they need to deny or
+                               // undeny a specific partition key around a 
node recovery.
+                               checkDenylistNodeAvailability();
+                               return getDenylistForTableFromCQL(tid);
+                           }
+
+                           // The synchronous reload method defaults to being 
wrapped with a supplyAsync in CacheLoader.asyncReload
+                           @Override
+                           public DenylistEntry reload(final TableId tid, 
final DenylistEntry oldValue)
+                           {
+                               // Only process when we can hit the user 
specified CL for the denylist consistency on a timer prompted reload
+                               if (checkDenylistNodeAvailability())
+                               {
+                                   final DenylistEntry newEntry = 
getDenylistForTableFromCQL(tid);
+                                   if (newEntry != null)
+                                       return newEntry;
+                               }
+                               if (oldValue != null)
+                                   return oldValue;
+                               return new DenylistEntry();
+                           }
+                       });
+    }
+
+    /**
+     * We need to fully rebuild a new cache to accommodate deleting items from 
the denylist and potentially shrinking
+     * the max allowable size in the list. We do not serve queries out of this 
denylist until it is populated
+     * so as not to introduce a window of having a partially filled cache 
allow denylisted entries.
+     */
+    public void load()
+    {
+        final long start = currentTimeMillis();
+
+        final Map<TableId, DenylistEntry> allDenylists = 
getDenylistForAllTablesFromCQL();
+
+        // On initial load we have the slight overhead of GC'ing our initial 
empty cache
+        LoadingCache<TableId, DenylistEntry> newDenylist = buildEmptyCache();
+        newDenylist.putAll(allDenylists);
+
+        synchronized (this)
+        {
+            loadSuccesses++;
+        }
+        denylist = newDenylist;
+        logger.info("Loaded partition denylist cache in {}ms", 
currentTimeMillis() - start);
+    }
+
+    /**
+     * We expect the caller to confirm that we are working with a valid 
keyspace and table. Further, we expect the usage
+     * pattern of this to be one-off key by key, not in a bulk process, so we 
reload the entire table's deny list entry
+     * on an addition or removal.
+     */
+    public boolean addKeyToDenylist(final String keyspace, final String table, 
final ByteBuffer key)
+    {
+        if (!canDenylistKeyspace(keyspace))
+            return false;
+
+        final String insert = String.format("INSERT INTO 
system_distributed.partition_denylist (ks_name, table_name, key) VALUES ('%s', 
'%s', 0x%s)",
+                                            keyspace, table, 
ByteBufferUtil.bytesToHex(key));
+
+        try
+        {
+            process(insert, DatabaseDescriptor.getDenylistConsistencyLevel());
+            return refreshTableDenylist(keyspace, table);
+        }
+        catch (final RequestExecutionException e)
+        {
+            logger.error("Failed to denylist key [{}] in {}/{}", 
ByteBufferUtil.bytesToHex(key), keyspace, table, e);
+        }
+        return false;
+    }
+
+    /**
+     * We expect the caller to confirm that we are working with a valid 
keyspace and table.
+     */
+    public boolean removeKeyFromDenylist(final String keyspace, final String 
table, final ByteBuffer key)
+    {
+        final String delete = String.format("DELETE FROM 
system_distributed.partition_denylist " +
+                                            "WHERE ks_name = '%s' " +
+                                            "AND table_name = '%s' " +
+                                            "AND key = 0x%s",
+                                            keyspace, table, 
ByteBufferUtil.bytesToHex(key));
+
+        try
+        {
+            process(delete, DatabaseDescriptor.getDenylistConsistencyLevel());
+            return refreshTableDenylist(keyspace, table);
+        }
+        catch (final RequestExecutionException e)
+        {
+            logger.error("Failed to remove key from denylist: [{}] in {}/{}", 
ByteBufferUtil.bytesToHex(key), keyspace, table, e);
+        }
+        return false;
+    }
+
+    /**
+     * We disallow denylisting partitions in certain critical keyspaces to 
prevent users from making their clusters
+     * inoperable.
+     */
+    private boolean canDenylistKeyspace(final String keyspace)
+    {
+        return !SchemaConstants.DISTRIBUTED_KEYSPACE_NAME.equals(keyspace) &&
+               !SchemaConstants.SYSTEM_KEYSPACE_NAME.equals(keyspace) &&
+               !SchemaConstants.TRACE_KEYSPACE_NAME.equals(keyspace) &&
+               !SchemaConstants.VIRTUAL_SCHEMA.equals(keyspace) &&
+               !SchemaConstants.VIRTUAL_VIEWS.equals(keyspace) &&
+               !SchemaConstants.AUTH_KEYSPACE_NAME.equals(keyspace);
+    }
+
+    public boolean isKeyPermitted(final String keyspace, final String table, 
final ByteBuffer key)
+    {
+        return isKeyPermitted(getTableId(keyspace, table), key);
+    }
+
+    public boolean isKeyPermitted(final TableId tid, final ByteBuffer key)
+    {
+        final TableMetadata tmd = Schema.instance.getTableMetadata(tid);
+
+        // We have a few quick state checks to get out of the way first; this 
is hot path so we want to do these first if possible.
+        if (!DatabaseDescriptor.getEnablePartitionDenylist() || tid == null || 
tmd == null || !canDenylistKeyspace(tmd.keyspace))
+            return true;
+
+        try
+        {
+            // If we don't have an entry for this table id, nothing in it is 
denylisted.
+            DenylistEntry entry = denylist.get(tid);
+            if (entry == null)
+                return true;
+            return !entry.keys.contains(key);
+        }
+        catch (final Exception e)
+        {
+            // In the event of an error accessing or populating the cache, 
assume it's not denylisted
+            logAccessFailure(tid, e);
+            return true;
+        }
+    }
+
+    private void logAccessFailure(final TableId tid, Throwable e)
+    {
+        final TableMetadata tmd = Schema.instance.getTableMetadata(tid);
+        if (tmd == null)
+            logger.debug("Failed to access partition denylist cache for 
unknown table id {}", tid.toString(), e);
+        else
+            logger.debug("Failed to access partition denylist cache for 
{}/{}", tmd.keyspace, tmd.name, e);
+    }
+
+    /**
+     * @return number of denylisted keys in range
+     */
+    public int getDeniedKeysInRangeCount(final String keyspace, final String 
table, final AbstractBounds<PartitionPosition> range)
+    {
+        return getDeniedKeysInRangeCount(getTableId(keyspace, table), range);
+    }
+
+    /**
+     * @return number of denylisted keys in range
+     */
+    public int getDeniedKeysInRangeCount(final TableId tid, final 
AbstractBounds<PartitionPosition> range)
+    {
+        final TableMetadata tmd = Schema.instance.getTableMetadata(tid);
+        if (!DatabaseDescriptor.getEnablePartitionDenylist() || tid == null || 
tmd == null || !canDenylistKeyspace(tmd.keyspace))
+            return 0;
+
+        try
+        {
+            final DenylistEntry denylistEntry = denylist.get(tid);
+            if (denylistEntry == null || denylistEntry.tokens.size() == 0)
+                return 0;
+            final Token startToken = range.left.getToken();
+            final Token endToken = range.right.getToken();
+
+            // Normal case
+            if (startToken.compareTo(endToken) <= 0 || endToken.isMinimum())
+            {
+                NavigableSet<Token> subSet = 
denylistEntry.tokens.tailSet(startToken, PartitionPosition.Kind.MIN_BOUND == 
range.left.kind());
+                if (!endToken.isMinimum())
+                    subSet = subSet.headSet(endToken, 
PartitionPosition.Kind.MAX_BOUND == range.right.kind());
+                return subSet.size();
+            }
+
+            // Wrap around case
+            return denylistEntry.tokens.tailSet(startToken, 
PartitionPosition.Kind.MIN_BOUND == range.left.kind()).size()
+                   + denylistEntry.tokens.headSet(endToken, 
PartitionPosition.Kind.MAX_BOUND == range.right.kind()).size();
+        }
+        catch (final Exception e)
+        {
+            logAccessFailure(tid, e);
+            return 0;
+        }
+    }
+
+    /**
+     * Get up to the configured allowable limit per table of denylisted keys
+     */
+    private DenylistEntry getDenylistForTableFromCQL(final TableId tid)
+    {
+        return getDenylistForTableFromCQL(tid, 
DatabaseDescriptor.getDenylistMaxKeysPerTable());
+    }
+
+    /**
+     * Attempts to reload the DenylistEntry data from CQL for the given 
TableId and key count.
+     * @return null if we do not find the data; allows cache reloader to 
preserve old value
+     */
+    private DenylistEntry getDenylistForTableFromCQL(final TableId tid, int 
limit)
+    {
+        final TableMetadata tmd = Schema.instance.getTableMetadata(tid);
+        if (tmd == null)
+            return null;
+
+        // We attempt to query just over our allowable max keys in order to 
check whether we have configured data beyond that limit and alert the user if so
+        final String readDenylist = String.format("SELECT * FROM %s.%s WHERE 
ks_name='%s' AND table_name='%s' LIMIT %d",
+                                                  
SchemaConstants.DISTRIBUTED_KEYSPACE_NAME,
+                                                  
SystemDistributedKeyspace.PARTITION_DENYLIST_TABLE,
+                                                  tmd.keyspace,
+                                                  tmd.name,
+                                                  limit + 1);
+
+        try
+        {
+            final UntypedResultSet results = process(readDenylist, 
DatabaseDescriptor.getDenylistConsistencyLevel());
+
+            // If there's no data in CQL we want to return an empty 
DenylistEntry so we don't continue using the old value in the cache
+            if (results == null || results.isEmpty())
+                return new DenylistEntry();
+
+            if (results.size() > limit)
+            {
+                // If our limit is < the standard per table we know we're at a 
global violation because we've constrained that request limit already.
+                boolean globalLimit = limit != 
DatabaseDescriptor.getDenylistMaxKeysPerTable();
+                String violationType = globalLimit ? "global" : "per-table";
+                int errorLimit = globalLimit ? 
DatabaseDescriptor.getDenylistMaxKeysTotal() : limit;
+                logger.error("Partition denylist for {}/{} has exceeded the {} 
allowance of ({}). Remaining keys were ignored; " +
+                             "please reduce the total number of keys denied or 
increase the denylist_max_keys_per_table param in " +
+                             "cassandra.yaml to avoid inconsistency in denied 
partitions across nodes.",
+                             tmd.keyspace,
+                             tmd.name,
+                             violationType,
+                             errorLimit);
+            }
+
+            final Set<ByteBuffer> keys = new HashSet<>();
+            final NavigableSet<Token> tokens = new TreeSet<>();
+
+            int processed = 0;
+            for (final UntypedResultSet.Row row : results)
+            {
+                final ByteBuffer key = row.getBlob("key");
+                keys.add(key);
+                
tokens.add(StorageService.instance.getTokenMetadata().partitioner.getToken(key));
+
+                processed++;
+                if (processed >= limit)
+                    break;
+            }
+            return new DenylistEntry(ImmutableSet.copyOf(keys), 
ImmutableSortedSet.copyOf(tokens));
+        }
+        catch (final RequestExecutionException e)
+        {
+            logger.error("Error reading partition_denylist table for {}/{}. 
Returning empty list.", tmd.keyspace, tmd.name, e);
+            return null;
+        }
+    }
+
+    /**
+     * This method relies on {@link #getDenylistForTableFromCQL(TableId, int)} 
to pull a limited amount of keys
+     * on a per-table basis from CQL to load into the cache. We need to 
navigate both respecting the max cache size limit
+     * as well as respecting the per-table limit.
+     * @return non-null mapping of TableId to DenylistEntry
+     */
+    private Map<TableId, DenylistEntry> getDenylistForAllTablesFromCQL()
+    {
+        // While we warn the user in this case, we continue with the reload 
anyway.
+        checkDenylistNodeAvailability();
+
+        final String allDeniedTables = String.format("SELECT DISTINCT ks_name, 
table_name FROM %s.%s",
+                                                     
SchemaConstants.DISTRIBUTED_KEYSPACE_NAME,
+                                                     
SystemDistributedKeyspace.PARTITION_DENYLIST_TABLE);
+        try
+        {
+            final UntypedResultSet deniedTableResults = 
process(allDeniedTables, DatabaseDescriptor.getDenylistConsistencyLevel());
+            if (deniedTableResults == null || deniedTableResults.isEmpty())
+                return Collections.emptyMap();
+
+            int totalProcessed = 0 ;
+            final Map<TableId, DenylistEntry> results = new HashMap<>();
+            for (final UntypedResultSet.Row row : deniedTableResults)
+            {
+                final String ks = row.getString("ks_name");
+                final String table = row.getString("table_name");
+                final TableId tid = getTableId(ks, table);
+                if (DatabaseDescriptor.getDenylistMaxKeysTotal() - 
totalProcessed <= 0)
+                {
+                    logger.error("Hit limit on allowable denylisted keys in 
total. Processed {} total entries. Not adding all entries to denylist for 
{}/{}." +
+                                 " Remove denylist entries in 
system_distributed.{} or increase your denylist_max_keys_total param in 
cassandra.yaml.",
+                                 totalProcessed,
+                                 ks,
+                                 table,
+                                 
SystemDistributedKeyspace.PARTITION_DENYLIST_TABLE);
+                    results.put(tid, new DenylistEntry());
+                }
+                else
+                {
+                    // Determine whether we can get up to table max or we need 
a subset at edge condition of max overflow.
+                    int allowedTableRecords = 
Math.min(DatabaseDescriptor.getDenylistMaxKeysPerTable(), 
DatabaseDescriptor.getDenylistMaxKeysTotal() - totalProcessed);
+                    DenylistEntry tableDenylist = 
getDenylistForTableFromCQL(tid, allowedTableRecords);
+                    if (tableDenylist != null)
+                        totalProcessed += tableDenylist.keys.size();
+                    results.put(tid, tableDenylist);
+                }
+            }
+            return results;
+        }
+        catch (final RequestExecutionException e)
+        {
+            logger.error("Error reading full partition denylist from "
+                         + SchemaConstants.DISTRIBUTED_KEYSPACE_NAME + "." + 
SystemDistributedKeyspace.PARTITION_DENYLIST_TABLE +
+                         ". Partition Denylisting will be compromised. 
Exception: " + e);
+            return Collections.emptyMap();
+        }
+    }
+
+    private boolean refreshTableDenylist(String keyspace, String table)
+    {
+        checkDenylistNodeAvailability();
+        final TableId tid = getTableId(keyspace, table);
+        if (tid == null)
+        {
+            logger.warn("Got denylist mutation for unknown ks/cf: {}/{}. 
Skipping refresh.", keyspace, table);
+            return false;
+        }
+
+        DenylistEntry newEntry = getDenylistForTableFromCQL(tid);
+        denylist.put(tid, newEntry);
+        return true;
+    }
+
+    private TableId getTableId(final String keyspace, final String table)
+    {
+        TableMetadata tmd = Schema.instance.getTableMetadata(keyspace, table);
+        return tmd == null ? null : tmd.id;
+    }
+}
diff --git a/src/java/org/apache/cassandra/schema/SchemaConstants.java 
b/src/java/org/apache/cassandra/schema/SchemaConstants.java
index 7b6b7de..870fa99 100644
--- a/src/java/org/apache/cassandra/schema/SchemaConstants.java
+++ b/src/java/org/apache/cassandra/schema/SchemaConstants.java
@@ -28,6 +28,9 @@ import com.google.common.collect.ImmutableSet;
 
 import org.apache.cassandra.db.Digest;
 
+/**
+ * When adding new String keyspace names here, double check if it needs to be 
added to PartitionDenylist.canDenylistKeyspace
+ */
 public final class SchemaConstants
 {
     public static final Pattern PATTERN_WORD_CHARS = Pattern.compile("\\w+");
diff --git 
a/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java 
b/src/java/org/apache/cassandra/schema/SystemDistributedKeyspace.java
similarity index 95%
rename from src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
rename to src/java/org/apache/cassandra/schema/SystemDistributedKeyspace.java
index 5df38a7..a539686 100644
--- a/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SystemDistributedKeyspace.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.cassandra.repair;
+package org.apache.cassandra.schema;
 
 import java.io.PrintWriter;
 import java.io.StringWriter;
@@ -49,14 +49,8 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.repair.CommonRange;
 import org.apache.cassandra.repair.messages.RepairOption;
-import org.apache.cassandra.schema.CompactionParams;
-import org.apache.cassandra.schema.KeyspaceMetadata;
-import org.apache.cassandra.schema.KeyspaceParams;
-import org.apache.cassandra.schema.SchemaConstants;
-import org.apache.cassandra.schema.TableId;
-import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.schema.Tables;
 import org.apache.cassandra.utils.FBUtilities;
 
 import static java.lang.String.format;
@@ -69,6 +63,8 @@ public final class SystemDistributedKeyspace
     {
     }
 
+    public static final String NAME = "system_distributed";
+
     private static final int DEFAULT_RF = 
CassandraRelevantProperties.SYSTEM_DISTRIBUTED_DEFAULT_RF.getInt();
     private static final Logger logger = 
LoggerFactory.getLogger(SystemDistributedKeyspace.class);
 
@@ -83,8 +79,9 @@ public final class SystemDistributedKeyspace
      * gen 3: gc_grace_seconds raised from 0 to 10 days in CASSANDRA-12954 in 
3.11.0
      * gen 4: compression chunk length reduced to 16KiB, 
memtable_flush_period_in_ms now unset on all tables in 4.0
      * gen 5: add ttl and TWCS to repair_history tables
+     * gen 6: add denylist table
      */
-    public static final long GENERATION = 5;
+    public static final long GENERATION = 6;
 
     public static final String REPAIR_HISTORY = "repair_history";
 
@@ -92,6 +89,8 @@ public final class SystemDistributedKeyspace
 
     public static final String VIEW_BUILD_STATUS = "view_build_status";
 
+    public static final String PARTITION_DENYLIST_TABLE = "partition_denylist";
+
     private static final TableMetadata RepairHistory =
         parse(REPAIR_HISTORY,
                 "Repair history",
@@ -147,6 +146,16 @@ public final class SystemDistributedKeyspace
                      + "status text,"
                      + "PRIMARY KEY ((keyspace_name, view_name), 
host_id))").build();
 
+    public static final TableMetadata PartitionDenylistTable =
+    parse(PARTITION_DENYLIST_TABLE,
+          "Partition keys which have been denied access",
+          "CREATE TABLE %s ("
+          + "ks_name text,"
+          + "table_name text,"
+          + "key blob,"
+          + "PRIMARY KEY ((ks_name, table_name), key))")
+    .build();
+
     private static TableMetadata.Builder parse(String table, String 
description, String cql)
     {
         return CreateTableStatement.parse(format(cql, table), 
SchemaConstants.DISTRIBUTED_KEYSPACE_NAME)
@@ -156,7 +165,7 @@ public final class SystemDistributedKeyspace
 
     public static KeyspaceMetadata metadata()
     {
-        return 
KeyspaceMetadata.create(SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, 
KeyspaceParams.simple(Math.max(DEFAULT_RF, 
DatabaseDescriptor.getDefaultKeyspaceRF())), Tables.of(RepairHistory, 
ParentRepairHistory, ViewBuildStatus));
+        return 
KeyspaceMetadata.create(SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, 
KeyspaceParams.simple(Math.max(DEFAULT_RF, 
DatabaseDescriptor.getDefaultKeyspaceRF())), Tables.of(RepairHistory, 
ParentRepairHistory, ViewBuildStatus, PartitionDenylistTable));
     }
 
     public static void startParentRepair(UUID parent_id, String keyspaceName, 
String[] cfnames, RepairOption options)
@@ -214,8 +223,8 @@ public final class SystemDistributedKeyspace
 
     public static void startRepairs(UUID id, UUID parent_id, String 
keyspaceName, String[] cfnames, CommonRange commonRange)
     {
-        //Don't record repair history if an upgrade is in progress as version 
3 nodes generates errors
-        //due to schema differences
+        // Don't record repair history if an upgrade is in progress as version 
3 nodes generates errors
+        // due to schema differences
         boolean includeNewColumns = !Gossiper.instance.hasMajorVersion3Nodes();
 
         InetAddressAndPort coordinator = 
FBUtilities.getBroadcastAddressAndPort();
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index c8495dc..fd20230 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -44,10 +44,6 @@ import com.google.common.cache.CacheLoader;
 import com.google.common.collect.Iterables;
 import com.google.common.primitives.Ints;
 import com.google.common.util.concurrent.Uninterruptibles;
-
-import org.apache.cassandra.utils.Clock;
-import org.apache.cassandra.utils.concurrent.CountDownLatch;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,7 +51,7 @@ import org.apache.cassandra.batchlog.Batch;
 import org.apache.cassandra.batchlog.BatchlogManager;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.RejectException;
+import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.CounterMutation;
 import org.apache.cassandra.db.DecoratedKey;
@@ -67,6 +63,7 @@ import org.apache.cassandra.db.PartitionRangeReadCommand;
 import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.ReadExecutionController;
 import org.apache.cassandra.db.ReadResponse;
+import org.apache.cassandra.db.RejectException;
 import org.apache.cassandra.db.SinglePartitionReadCommand;
 import org.apache.cassandra.db.TruncateRequest;
 import org.apache.cassandra.db.WriteType;
@@ -79,12 +76,12 @@ import 
org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 import org.apache.cassandra.db.rows.RowIterator;
 import org.apache.cassandra.db.view.ViewUtils;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.exceptions.ReadAbortException;
 import org.apache.cassandra.exceptions.CasWriteTimeoutException;
 import org.apache.cassandra.exceptions.CasWriteUnknownResultException;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.IsBootstrappingException;
 import org.apache.cassandra.exceptions.OverloadedException;
+import org.apache.cassandra.exceptions.ReadAbortException;
 import org.apache.cassandra.exceptions.ReadFailureException;
 import org.apache.cassandra.exceptions.ReadTimeoutException;
 import org.apache.cassandra.exceptions.RequestFailureException;
@@ -109,6 +106,7 @@ import org.apache.cassandra.metrics.CASClientRequestMetrics;
 import org.apache.cassandra.metrics.CASClientWriteRequestMetrics;
 import org.apache.cassandra.metrics.ClientRequestMetrics;
 import org.apache.cassandra.metrics.ClientWriteRequestMetrics;
+import org.apache.cassandra.metrics.DenylistMetrics;
 import org.apache.cassandra.metrics.ReadRepairMetrics;
 import org.apache.cassandra.metrics.StorageMetrics;
 import org.apache.cassandra.metrics.ViewWriteMetrics;
@@ -118,8 +116,10 @@ import org.apache.cassandra.net.MessageFlag;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.RequestCallback;
 import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.schema.PartitionDenylist;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.paxos.Commit;
 import org.apache.cassandra.service.paxos.PaxosState;
@@ -131,11 +131,13 @@ import 
org.apache.cassandra.service.reads.range.RangeCommands;
 import org.apache.cassandra.service.reads.repair.ReadRepair;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.triggers.TriggerExecutor;
+import org.apache.cassandra.utils.Clock;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MBeanWrapper;
 import org.apache.cassandra.utils.MonotonicClock;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.UUIDGen;
+import org.apache.cassandra.utils.concurrent.CountDownLatch;
 import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
 
 import static com.google.common.collect.Iterables.concat;
@@ -180,11 +182,14 @@ public class StorageProxy implements StorageProxyMBean
     private static final ViewWriteMetrics viewWriteMetrics = new 
ViewWriteMetrics("ViewWrite");
     private static final Map<ConsistencyLevel, ClientRequestMetrics> 
readMetricsMap = new EnumMap<>(ConsistencyLevel.class);
     private static final Map<ConsistencyLevel, ClientWriteRequestMetrics> 
writeMetricsMap = new EnumMap<>(ConsistencyLevel.class);
+    private static final DenylistMetrics denylistMetrics = new 
DenylistMetrics();
 
     private static final String DISABLE_SERIAL_READ_LINEARIZABILITY_KEY = 
"cassandra.unsafe.disable-serial-reads-linearizability";
     private static final boolean disableSerialReadLinearizability =
         
Boolean.parseBoolean(System.getProperty(DISABLE_SERIAL_READ_LINEARIZABILITY_KEY,
 "false"));
 
+    private static final PartitionDenylist partitionDenylist = new 
PartitionDenylist();
+
     private StorageProxy()
     {
     }
@@ -297,6 +302,13 @@ public class StorageProxy implements StorageProxyMBean
         {
             TableMetadata metadata = 
Schema.instance.validateTable(keyspaceName, cfName);
 
+            if (DatabaseDescriptor.getEnablePartitionDenylist() && 
DatabaseDescriptor.getEnableDenylistWrites() && 
!partitionDenylist.isKeyPermitted(keyspaceName, cfName, key.getKey()))
+            {
+                denylistMetrics.incrementWritesRejected();
+                throw new InvalidRequestException(String.format("Unable to CAS 
write to denylisted partition [0x%s] in %s/%s",
+                                                                
key.toString(), keyspaceName, cfName));
+            }
+
             Supplier<Pair<PartitionUpdate, RowIterator>> updateProposer = () ->
             {
                 // read the current values and check they validate the 
conditions
@@ -1056,6 +1068,25 @@ public class StorageProxy implements StorageProxyMBean
                                           long queryStartNanoTime)
     throws WriteTimeoutException, WriteFailureException, UnavailableException, 
OverloadedException, InvalidRequestException
     {
+        if (DatabaseDescriptor.getEnablePartitionDenylist() && 
DatabaseDescriptor.getEnableDenylistWrites())
+        {
+            for (final IMutation mutation : mutations)
+            {
+                for (final TableId tid : mutation.getTableIds())
+                {
+                    if (!partitionDenylist.isKeyPermitted(tid, 
mutation.key().getKey()))
+                    {
+                        denylistMetrics.incrementWritesRejected();
+                        // While Schema.instance.getTableMetadata() can return 
a null value, in this case the isKeyPermitted
+                        // call above ensures that we cannot have a null 
associated tid at this point.
+                        final TableMetadata tmd = 
Schema.instance.getTableMetadata(tid);
+                        throw new 
InvalidRequestException(String.format("Unable to write to denylisted partition 
[0x%s] in %s/%s",
+                                                                        
mutation.key().toString(), tmd.keyspace, tmd.name));
+                    }
+                }
+            }
+        }
+
         Collection<Mutation> augmented = 
TriggerExecutor.instance.execute(mutations);
 
         boolean updatesView = 
Keyspace.open(mutations.iterator().next().getKeyspaceName())
@@ -1739,6 +1770,19 @@ public class StorageProxy implements StorageProxyMBean
             throw new IsBootstrappingException();
         }
 
+        if (DatabaseDescriptor.getEnablePartitionDenylist() && 
DatabaseDescriptor.getEnableDenylistReads())
+        {
+            for (SinglePartitionReadCommand command : group.queries)
+            {
+                if (!partitionDenylist.isKeyPermitted(command.metadata().id, 
command.partitionKey().getKey()))
+                {
+                    denylistMetrics.incrementReadsRejected();
+                    throw new InvalidRequestException(String.format("Unable to 
read denylisted partition [0x%s] in %s/%s",
+                                                                    
command.partitionKey().toString(), command.metadata().keyspace, 
command.metadata().name));
+                }
+            }
+        }
+
         return consistencyLevel.isSerialConsistency()
              ? readWithPaxos(group, consistencyLevel, state, 
queryStartNanoTime)
              : readRegular(group, consistencyLevel, queryStartNanoTime);
@@ -2072,6 +2116,18 @@ public class StorageProxy implements StorageProxyMBean
                                                   ConsistencyLevel 
consistencyLevel,
                                                   long queryStartNanoTime)
     {
+        if (DatabaseDescriptor.getEnablePartitionDenylist() && 
DatabaseDescriptor.getEnableDenylistRangeReads())
+        {
+            final int denylisted = 
partitionDenylist.getDeniedKeysInRangeCount(command.metadata().id, 
command.dataRange().keyRange());
+            if (denylisted > 0)
+            {
+                denylistMetrics.incrementRangeReadsRejected();
+                String tokens = command.loggableTokens();
+                throw new InvalidRequestException(String.format("Attempted to 
read a range containing %d denylisted keys in %s/%s." +
+                                                                " Range read: 
%s", denylisted, command.metadata().keyspace, command.metadata().name,
+                                                                tokens));
+            }
+        }
         return RangeCommands.partitions(command, consistencyLevel, 
queryStartNanoTime);
     }
 
@@ -2756,4 +2812,121 @@ public class StorageProxy implements StorageProxyMBean
     {
         DatabaseDescriptor.setCheckForDuplicateRowsDuringCompaction(false);
     }
+
+    public void initialLoadPartitionDenylist()
+    {
+        partitionDenylist.initialLoad();
+    }
+
+    @Override
+    public void loadPartitionDenylist()
+    {
+        partitionDenylist.load();
+    }
+
+    @Override
+    public int getPartitionDenylistLoadAttempts()
+    {
+        return partitionDenylist.getLoadAttempts();
+    }
+
+    @Override
+    public int getPartitionDenylistLoadSuccesses()
+    {
+        return partitionDenylist.getLoadSuccesses();
+    }
+
+    @Override
+    public void setEnablePartitionDenylist(boolean enabled)
+    {
+        DatabaseDescriptor.setEnablePartitionDenylist(enabled);
+    }
+
+    @Override
+    public void setEnableDenylistWrites(boolean enabled)
+    {
+        DatabaseDescriptor.setEnableDenylistWrites(enabled);
+    }
+
+    @Override
+    public void setEnableDenylistReads(boolean enabled)
+    {
+        DatabaseDescriptor.setEnableDenylistReads(enabled);
+    }
+
+    @Override
+    public void setEnableDenylistRangeReads(boolean enabled)
+    {
+        DatabaseDescriptor.setEnableDenylistRangeReads(enabled);
+    }
+
+    @Override
+    public void setDenylistMaxKeysPerTable(int value)
+    {
+        DatabaseDescriptor.setDenylistMaxKeysPerTable(value);
+    }
+
+    @Override
+    public void setDenylistMaxKeysTotal(int value)
+    {
+        DatabaseDescriptor.setDenylistMaxKeysTotal(value);
+    }
+
+    /**
+     * Actively denies read and write access to the provided Partition Key
+     * @param keyspace Name of keyspace containing the PK you wish to deny 
access to
+     * @param table Name of table containing the PK you wish to deny access to
+     * @param partitionKeyAsString String representation of the PK you want to 
deny access to
+     * @return true if successfully added, false if failure
+     */
+    @Override
+    public boolean denylistKey(String keyspace, String table, String 
partitionKeyAsString)
+    {
+        if (!Schema.instance.getKeyspaces().contains(keyspace))
+            return false;
+
+        final ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(keyspace, 
table);
+        if (cfs == null)
+            return false;
+
+        final ByteBuffer bytes = 
cfs.metadata.get().partitionKeyType.fromString(partitionKeyAsString);
+        return partitionDenylist.addKeyToDenylist(keyspace, table, bytes);
+    }
+
+    /**
+     * Attempts to remove the provided pk from the ks + table deny list
+     * @param keyspace Keyspace containing the pk to remove the denylist entry 
for
+     * @param table Table containing the pk to remove denylist entry for
+     * @param partitionKeyAsString String representation of the PK you want to 
re-allow access to
+     * @return true if found and removed, false if not
+     */
+    @Override
+    public boolean removeDenylistKey(String keyspace, String table, String 
partitionKeyAsString)
+    {
+        if (!Schema.instance.getKeyspaces().contains(keyspace))
+            return false;
+
+        final ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(keyspace, 
table);
+        if (cfs == null)
+            return false;
+
+        final ByteBuffer bytes = 
cfs.metadata.get().partitionKeyType.fromString(partitionKeyAsString);
+        return partitionDenylist.removeKeyFromDenylist(keyspace, table, bytes);
+    }
+
+    /**
+     * A simple check for operators to determine what the denylisted value for 
a pk is on a node
+     */
+    public boolean isKeyDenylisted(String keyspace, String table, String 
partitionKeyAsString)
+    {
+        if (!Schema.instance.getKeyspaces().contains(keyspace))
+            return false;
+
+        final ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(keyspace, 
table);
+        if (cfs == null)
+            return false;
+
+        final ByteBuffer bytes = 
cfs.metadata.get().partitionKeyType.fromString(partitionKeyAsString);
+        return !partitionDenylist.isKeyPermitted(keyspace, table, bytes);
+    }
 }
diff --git a/src/java/org/apache/cassandra/service/StorageProxyMBean.java 
b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
index 56c27e9..a89b877 100644
--- a/src/java/org/apache/cassandra/service/StorageProxyMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
@@ -63,6 +63,20 @@ public interface StorageProxyMBean
 
     @Deprecated
     public int getOtcBacklogExpirationInterval();
+
+    public void loadPartitionDenylist();
+    public int getPartitionDenylistLoadAttempts();
+    public int getPartitionDenylistLoadSuccesses();
+    public void setEnablePartitionDenylist(boolean enabled);
+    public void setEnableDenylistWrites(boolean enabled);
+    public void setEnableDenylistReads(boolean enabled);
+    public void setEnableDenylistRangeReads(boolean enabled);
+    public boolean denylistKey(String keyspace, String table, String 
partitionKeyAsString);
+    public boolean removeDenylistKey(String keyspace, String table, String 
partitionKeyAsString);
+    public void setDenylistMaxKeysPerTable(int value);
+    public void setDenylistMaxKeysTotal(int value);
+    public boolean isKeyDenylisted(String keyspace, String table, String 
partitionKeyAsString);
+
     @Deprecated
     public void setOtcBacklogExpirationInterval(int intervalInMillis);
 
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index 65a614a..5015f75 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -114,6 +114,7 @@ import org.apache.cassandra.schema.MigrationManager;
 import org.apache.cassandra.schema.ReplicationParams;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.schema.SystemDistributedKeyspace;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.schema.TableMetadataRef;
 import org.apache.cassandra.schema.ViewMetadata;
@@ -1120,6 +1121,8 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
             {
                 logger.warn("Some data streaming failed. Use nodetool to check 
bootstrap state and resume. For more, see `nodetool help bootstrap`. {}", 
SystemKeyspace.getBootstrapState());
             }
+
+            StorageProxy.instance.initialLoadPartitionDenylist();
         }
         else
         {
diff --git 
a/src/java/org/apache/cassandra/service/reads/range/RangeCommands.java 
b/src/java/org/apache/cassandra/service/reads/range/RangeCommands.java
index 3452a35..5b656d7 100644
--- a/src/java/org/apache/cassandra/service/reads/range/RangeCommands.java
+++ b/src/java/org/apache/cassandra/service/reads/range/RangeCommands.java
@@ -24,10 +24,14 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.DataRange;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.PartitionRangeReadCommand;
 import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.exceptions.UnavailableException;
 import org.apache.cassandra.index.Index;
+import org.apache.cassandra.locator.ReplicaPlans;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.FBUtilities;
 import org.assertj.core.util.VisibleForTesting;
@@ -114,4 +118,26 @@ public class RangeCommands
         return (maxExpectedResults / DatabaseDescriptor.getNumTokens())
                / 
keyspace.getReplicationStrategy().getReplicationFactor().allReplicas;
     }
+
+    /**
+     * Added specifically to check for sufficient nodes live to serve 
partition denylist queries
+     */
+    public static boolean sufficientLiveNodesForSelectStar(TableMetadata 
metadata, ConsistencyLevel consistency)
+    {
+        try
+        {
+            Keyspace keyspace = Keyspace.open(metadata.keyspace);
+            ReplicaPlanIterator rangeIterator = new 
ReplicaPlanIterator(DataRange.allData(metadata.partitioner).keyRange(),
+                                                                        
keyspace, consistency);
+
+            // Called for the side effect of running 
assureSufficientLiveReplicasForRead.
+            // Deliberately called with an invalid vnode count in case it is 
used elsewhere in the future..
+            rangeIterator.forEachRemaining(r ->  
ReplicaPlans.forRangeRead(keyspace, consistency, r.range(), -1));
+            return true;
+        }
+        catch (UnavailableException e)
+        {
+            return false;
+        }
+    }
 }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/GossipSettlesTest.java 
b/test/distributed/org/apache/cassandra/distributed/test/GossipSettlesTest.java
index 5b9629a..341d854 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/GossipSettlesTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/GossipSettlesTest.java
@@ -33,7 +33,7 @@ import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.repair.SystemDistributedKeyspace;
+import org.apache.cassandra.schema.SystemDistributedKeyspace;
 import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.StorageService;
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/PartitionDenylistTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/PartitionDenylistTest.java
new file mode 100644
index 0000000..43b3ef8
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/PartitionDenylistTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.service.StorageService;
+
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
+
+public class PartitionDenylistTest extends TestBaseImpl
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(PartitionDenylistTest.class);
+    private static final int testReplicationFactor = 3;
+
+    // Create a four node cluster, populate with some denylist entries, stop 
all
+    // the nodes, then bring them up one by one, waiting for each node to 
complete
+    // startup before starting the next.
+    //
+    // On startup each node runs a SELECT * query on the partition denylist 
table
+    // to populate the cache.  The whole keyspace is unlikely to be available 
until
+    // three of the four nodes are started, so the early nodes will go through 
several
+    // cycles of failing to retrieve the partition denylist before succeeding.
+    //
+    // with({NETWORK,GOSSIP} is currently required for in-JVM dtests to create
+    // the distributed system tables.
+    @Test
+    public void checkStartupWithoutTriggeringUnavailable() throws IOException, 
InterruptedException, ExecutionException, TimeoutException
+    {
+        int nodeCount = 4;
+        System.setProperty("cassandra.ring_delay_ms", "5000"); // down from 
30s default
+        System.setProperty("cassandra.consistent.rangemovement", "false");
+        System.setProperty("cassandra.consistent.simultaneousmoves.allow", 
"true");
+        try (Cluster cluster = Cluster.build(nodeCount)
+                                      .withConfig(config -> config
+                                      .with(NETWORK)
+                                      .with(GOSSIP)
+                                      .set("enable_partition_denylist", true)
+                                      
.set("denylist_initial_load_retry_seconds", 1))
+                                      .createWithoutStarting())
+        {
+            cluster.forEach(i -> {
+                i.startup();
+                i.runOnInstance(PartitionDenylistTest::waitUntilStarted);
+            });
+
+            // Do a cluster-wide no unavailables were recorded while the 
denylist was loaded.
+            cluster.forEach(i -> 
i.runOnInstance(PartitionDenylistTest::checkNoUnavailables));
+        }
+    }
+
+    static private void waitUntilStarted()
+    {
+        waitUntilStarted(60, TimeUnit.SECONDS);
+    }
+
+    // To be called inside the instance with runOnInstance
+    static private void waitUntilStarted(int waitDuration, TimeUnit waitUnits)
+    {
+        long deadlineInMillis = currentTimeMillis() + Math.max(1, 
waitUnits.toMillis(waitDuration));
+        while (!StorageService.instance.getOperationMode().equals("NORMAL"))
+        {
+            if (currentTimeMillis() >= deadlineInMillis)
+            {
+                throw new RuntimeException("Instance did not reach application 
state NORMAL before timeout");
+            }
+            Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS);
+        }
+    }
+
+    // To be called inside the instance with runOnInstance
+    static private void checkNoUnavailables()
+    {
+        long deadlineInMillis = currentTimeMillis() + 
TimeUnit.SECONDS.toMillis(30);
+
+        while (currentTimeMillis() < deadlineInMillis &&
+               StorageProxy.instance.getPartitionDenylistLoadSuccesses() == 0)
+        {
+            Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+        }
+
+        Assert.assertTrue("Partition denylist must have loaded before checking 
unavailables",
+                          
StorageProxy.instance.getPartitionDenylistLoadSuccesses() > 0);
+    }
+
+    // To be called inside the instance with runOnInstance, no nodes are 
started/stopped
+    // and not enough nodes are available to succeed, so it should just retry 
a few times
+    static private void checkTimerActive()
+    {
+        long deadlineInMillis = currentTimeMillis() + 
TimeUnit.SECONDS.toMillis(30);
+
+        do
+        {
+            // Make sure at least two load attempts have happened,
+            // in case we received a node up event about this node
+            if (StorageProxy.instance.getPartitionDenylistLoadAttempts() > 2)
+            {
+                return;
+            }
+            Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+        } while (currentTimeMillis() < deadlineInMillis);
+
+        Assert.fail("Node did not retry loading on timeout in 30s");
+    }
+
+    @Test
+    public void checkTimerRetriesLoad() throws IOException
+    {
+        int nodeCount = 3;
+
+        try (Cluster cluster = Cluster.build(nodeCount)
+                                      .withConfig(config -> config
+                                      .with(NETWORK)
+                                      .with(GOSSIP)
+                                      .set("enable_partition_denylist", true)
+                                      
.set("denylist_initial_load_retry_seconds", 1))
+                                      .createWithoutStarting())
+        {
+            // Starting without networking enabled in the hope it doesn't 
trigger
+            // node lifecycle events when nodes start up.
+            cluster.get(1).startup();
+            
cluster.get(1).runOnInstance(PartitionDenylistTest::checkTimerActive);
+        }
+    }
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/metric/TableMetricTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/metric/TableMetricTest.java
index 3997e4a..0e915d3 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/metric/TableMetricTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/metric/TableMetricTest.java
@@ -41,7 +41,7 @@ import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.api.IInvokableInstance;
 import org.apache.cassandra.distributed.test.TestBaseImpl;
-import org.apache.cassandra.repair.SystemDistributedKeyspace;
+import org.apache.cassandra.schema.SystemDistributedKeyspace;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.SchemaKeyspace;
 import org.apache.cassandra.tracing.TraceKeyspace;
diff --git 
a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java 
b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
index 3478fb3..20fc8b5 100644
--- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
+++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
@@ -129,6 +129,7 @@ public class DatabaseDescriptorRefTest
     "org.apache.cassandra.exceptions.ConfigurationException",
     "org.apache.cassandra.exceptions.RequestValidationException",
     "org.apache.cassandra.exceptions.CassandraException",
+    "org.apache.cassandra.exceptions.InvalidRequestException",
     "org.apache.cassandra.exceptions.TransportException",
     "org.apache.cassandra.fql.FullQueryLogger",
     "org.apache.cassandra.fql.FullQueryLoggerOptions",
@@ -152,12 +153,14 @@ public class DatabaseDescriptorRefTest
     "org.apache.cassandra.io.util.SpinningDiskOptimizationStrategy",
     "org.apache.cassandra.io.util.PathUtils$IOToLongFunction",
     "org.apache.cassandra.locator.Replica",
+    "org.apache.cassandra.locator.ReplicaCollection",
     "org.apache.cassandra.locator.SimpleSeedProvider",
     "org.apache.cassandra.locator.SeedProvider",
     "org.apache.cassandra.security.ISslContextFactory",
     "org.apache.cassandra.security.SSLFactory",
     "org.apache.cassandra.security.EncryptionContext",
     "org.apache.cassandra.service.CacheService$CacheType",
+    "org.apache.cassandra.transport.ProtocolException",
     "org.apache.cassandra.utils.binlog.BinLogOptions",
     "org.apache.cassandra.utils.FBUtilities",
     "org.apache.cassandra.utils.FBUtilities$1",
diff --git a/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java 
b/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java
index 065f557..d389a6c 100644
--- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java
+++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java
@@ -25,6 +25,7 @@ import java.net.NetworkInterface;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Enumeration;
+import java.util.function.Consumer;
 
 
 import com.google.common.base.Throwables;
@@ -37,6 +38,7 @@ import org.apache.cassandra.exceptions.ConfigurationException;
 import org.assertj.core.api.Assertions;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -592,6 +594,26 @@ public class DatabaseDescriptorTest
         Assert.assertEquals(1, 
DatabaseDescriptor.tokensFromString(config.initial_token).size());
     }
 
+    @Test
+    public void testDenylistInvalidValuesRejected()
+    {
+        DatabaseDescriptor.loadConfig();
+
+        
expectIllegalArgumentException(DatabaseDescriptor::setDenylistRefreshSeconds, 
0, "denylist_refresh_seconds must be a positive integer.");
+        
expectIllegalArgumentException(DatabaseDescriptor::setDenylistRefreshSeconds, 
-1, "denylist_refresh_seconds must be a positive integer.");
+        
expectIllegalArgumentException(DatabaseDescriptor::setDenylistMaxKeysPerTable, 
0, "denylist_max_keys_per_table must be a positive integer.");
+        
expectIllegalArgumentException(DatabaseDescriptor::setDenylistMaxKeysPerTable, 
-1, "denylist_max_keys_per_table must be a positive integer.");
+        
expectIllegalArgumentException(DatabaseDescriptor::setDenylistMaxKeysTotal, 0, 
"denylist_max_keys_total must be a positive integer.");
+        
expectIllegalArgumentException(DatabaseDescriptor::setDenylistMaxKeysTotal, -1, 
"denylist_max_keys_total must be a positive integer.");
+    }
+
+    private void expectIllegalArgumentException(Consumer<Integer> c, int val, 
String expectedMessage)
+    {
+        assertThatThrownBy(() -> c.accept(val))
+            .isInstanceOf(IllegalArgumentException.class)
+            .hasMessageContaining(expectedMessage);
+    }
+
     // coordinator read
     @Test
     public void testClientLargeReadWarnAndAbortNegative()
diff --git a/test/unit/org/apache/cassandra/service/PartitionDenylistTest.java 
b/test/unit/org/apache/cassandra/service/PartitionDenylistTest.java
new file mode 100644
index 0000000..0751036
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/PartitionDenylistTest.java
@@ -0,0 +1,495 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.Tables;
+
+import static org.apache.cassandra.cql3.QueryProcessor.process;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+public class PartitionDenylistTest
+{
+    private final static String ks_cql = "partition_denylist_keyspace";
+
+    @BeforeClass
+    public static void init()
+    {
+        CQLTester.prepareServer();
+
+        KeyspaceMetadata schema = KeyspaceMetadata.create(ks_cql,
+                                                          
KeyspaceParams.simple(1),
+                                                          Tables.of(
+            CreateTableStatement.parse("CREATE TABLE table1 ("
+                                       + "keyone text, "
+                                       + "keytwo text, "
+                                       + "qux text, "
+                                       + "quz text, "
+                                       + "foo text, "
+                                       + "PRIMARY KEY((keyone, keytwo), qux, 
quz) ) ", ks_cql).build(),
+            CreateTableStatement.parse("CREATE TABLE table2 ("
+                                       + "keyone text, "
+                                       + "keytwo text, "
+                                       + "keythree text, "
+                                       + "value text, "
+                                       + "PRIMARY KEY((keyone, keytwo), 
keythree) ) ", ks_cql).build(),
+            CreateTableStatement.parse("CREATE TABLE table3 ("
+                                       + "keyone text, "
+                                       + "keytwo text, "
+                                       + "keythree text, "
+                                       + "value text, "
+                                       + "PRIMARY KEY((keyone, keytwo), 
keythree) ) ", ks_cql).build()
+        ));
+        Schema.instance.load(schema);
+        DatabaseDescriptor.setEnablePartitionDenylist(true);
+        DatabaseDescriptor.setEnableDenylistRangeReads(true);
+        DatabaseDescriptor.setDenylistConsistencyLevel(ConsistencyLevel.ONE);
+        DatabaseDescriptor.setDenylistRefreshSeconds(1);
+        StorageService.instance.initServer(0);
+    }
+
+    @Before
+    public void setup()
+    {
+        DatabaseDescriptor.setEnablePartitionDenylist(true);
+        resetDenylist();
+
+        process("INSERT INTO " + ks_cql + ".table1 (keyone, keytwo, qux, quz, 
foo) VALUES ('aaa', 'bbb', 'ccc', 'ddd', 'v')", ConsistencyLevel.ONE);
+        process("INSERT INTO " + ks_cql + ".table1 (keyone, keytwo, qux, quz, 
foo) VALUES ('bbb', 'ccc', 'ccc', 'ddd', 'v')", ConsistencyLevel.ONE);
+        process("INSERT INTO " + ks_cql + ".table1 (keyone, keytwo, qux, quz, 
foo) VALUES ('ccc', 'ddd', 'ccc', 'ddd', 'v')", ConsistencyLevel.ONE);
+        process("INSERT INTO " + ks_cql + ".table1 (keyone, keytwo, qux, quz, 
foo) VALUES ('ddd', 'eee', 'ccc', 'ddd', 'v')", ConsistencyLevel.ONE);
+        process("INSERT INTO " + ks_cql + ".table1 (keyone, keytwo, qux, quz, 
foo) VALUES ('eee', 'fff', 'ccc', 'ddd', 'v')", ConsistencyLevel.ONE);
+        process("INSERT INTO " + ks_cql + ".table1 (keyone, keytwo, qux, quz, 
foo) VALUES ('fff', 'ggg', 'ccc', 'ddd', 'v')", ConsistencyLevel.ONE);
+        process("INSERT INTO " + ks_cql + ".table1 (keyone, keytwo, qux, quz, 
foo) VALUES ('ggg', 'hhh', 'ccc', 'ddd', 'v')", ConsistencyLevel.ONE);
+        process("INSERT INTO " + ks_cql + ".table1 (keyone, keytwo, qux, quz, 
foo) VALUES ('hhh', 'iii', 'ccc', 'ddd', 'v')", ConsistencyLevel.ONE);
+        process("INSERT INTO " + ks_cql + ".table1 (keyone, keytwo, qux, quz, 
foo) VALUES ('iii', 'jjj', 'ccc', 'ddd', 'v')", ConsistencyLevel.ONE);
+        process("INSERT INTO " + ks_cql + ".table1 (keyone, keytwo, qux, quz, 
foo) VALUES ('jjj', 'kkk', 'ccc', 'ddd', 'v')", ConsistencyLevel.ONE);
+
+
+        for (int i = 0; i < 50; i++)
+            process(String.format("INSERT INTO " + ks_cql + ".table2 (keyone, 
keytwo, keythree, value) VALUES ('%d', '%d', '%d', '%d')", i, i, i, i), 
ConsistencyLevel.ONE);
+
+        for (int i = 0; i < 50; i++)
+            process(String.format("INSERT INTO " + ks_cql + ".table3 (keyone, 
keytwo, keythree, value) VALUES ('%d', '%d', '%d', '%d')", i, i, i, i), 
ConsistencyLevel.ONE);
+
+        denylist("table1", "bbb:ccc");
+        refreshList();
+    }
+
+
+    private static void denylist(String table, final String key)
+    {
+        StorageProxy.instance.denylistKey(ks_cql, table, key);
+    }
+
+    private static void refreshList()
+    {
+        StorageProxy.instance.loadPartitionDenylist();
+    }
+
+    /**
+     * @return Whether the *attempt* to remove the denylisted key and refresh 
succeeded. Doesn't necessarily indicate the key
+     * was previously blocked and found.
+     */
+    private static boolean removeDenylist(final String ks, final String table, 
final String key)
+    {
+        return StorageProxy.instance.removeDenylistKey(ks, table, key);
+    }
+
+    @Test
+    public void testRead()
+    {
+        process("SELECT * FROM " + ks_cql + ".table1 WHERE keyone='aaa' and 
keytwo='bbb'", ConsistencyLevel.ONE);
+    }
+
+    @Test
+    public void testReadDenylisted()
+    {
+        assertThatThrownBy(() -> process("SELECT * FROM " + ks_cql + ".table1 
WHERE keyone='bbb' and keytwo='ccc'", ConsistencyLevel.ONE))
+                           .isInstanceOf(InvalidRequestException.class)
+                           .hasMessageContaining("Unable to read denylisted 
partition");
+    }
+
+    @Test
+    public void testIsKeyDenylistedAPI()
+    {
+        Assert.assertTrue(StorageProxy.instance.isKeyDenylisted(ks_cql, 
"table1", "bbb:ccc"));
+        resetDenylist();
+        Assert.assertFalse(StorageProxy.instance.isKeyDenylisted(ks_cql, 
"table1", "bbb:ccc"));
+
+        // Confirm an add mutates cache state
+        denylist("table1", "bbb:ccc");
+        Assert.assertTrue(StorageProxy.instance.isKeyDenylisted(ks_cql, 
"table1", "bbb:ccc"));
+
+        // Confirm removal then mutates cache w/out explicit reload
+        StorageProxy.instance.removeDenylistKey(ks_cql, "table1", "bbb:ccc");
+        Assert.assertFalse(StorageProxy.instance.isKeyDenylisted(ks_cql, 
"table1", "bbb:ccc"));
+    }
+
+    @Test
+    public void testReadUndenylisted()
+    {
+        resetDenylist();
+        process("SELECT * FROM " + ks_cql + ".table1 WHERE keyone='ccc' and 
keytwo='ddd'", ConsistencyLevel.ONE);
+    }
+
+    @Test
+    public void testWrite()
+    {
+        process("INSERT INTO " + ks_cql + ".table1 (keyone, keytwo, qux, quz, 
foo) VALUES ('eee', 'fff', 'ccc', 'ddd', 'v')", ConsistencyLevel.ONE);
+        process("DELETE FROM " + ks_cql + ".table1 WHERE keyone='eee' and 
keytwo='fff'", ConsistencyLevel.ONE);
+    }
+
+    @Test
+    public void testWriteDenylisted()
+    {
+        assertThatThrownBy(() -> process("INSERT INTO " + ks_cql + ".table1 
(keyone, keytwo, qux, quz, foo) VALUES ('bbb', 'ccc', 'eee', 'fff', 'w')", 
ConsistencyLevel.ONE))
+                           .isInstanceOf(InvalidRequestException.class)
+                           .hasMessageContaining("Unable to write to 
denylisted partition");
+    }
+
+    @Test
+    public void testCASWriteDenylisted()
+    {
+        assertThatThrownBy(() -> process("UPDATE " + ks_cql + ".table1 SET 
foo='w' WHERE keyone='bbb' AND keytwo='ccc' AND qux='eee' AND quz='fff' IF 
foo='v'", ConsistencyLevel.LOCAL_SERIAL))
+                           .isInstanceOf(InvalidRequestException.class)
+                           .hasMessageContaining("Unable to CAS write to 
denylisted partition");
+    }
+
+    @Test
+    public void testWriteUndenylisted()
+    {
+        resetDenylist();
+        process("INSERT INTO " + ks_cql + ".table1 (keyone, keytwo, qux, quz, 
foo) VALUES ('bbb', 'ccc', 'eee', 'fff', 'w')", ConsistencyLevel.ONE);
+    }
+
+    @Test
+    public void testRangeSlice()
+    {
+        UntypedResultSet rows;
+        rows = process("SELECT * FROM " + ks_cql + ".table1 WHERE 
token(keyone, keytwo) < token('bbb', 'ccc')", ConsistencyLevel.ONE);
+        Assert.assertEquals(1, rows.size());
+
+        // 10 entries total in our table
+        rows = process("SELECT * FROM " + ks_cql + ".table1 WHERE 
token(keyone, keytwo) > token('bbb', 'ccc')", ConsistencyLevel.ONE);
+        Assert.assertEquals(8, rows.size());
+
+        rows = process("SELECT * FROM " + ks_cql + ".table1 WHERE 
token(keyone, keytwo) >= token('aaa', 'bbb') and token(keyone, keytwo) < 
token('bbb', 'ccc')", ConsistencyLevel.ONE);
+        Assert.assertEquals(1, rows.size());
+
+        rows = process("SELECT * FROM " + ks_cql + ".table1 WHERE 
token(keyone, keytwo) > token('bbb', 'ccc') and token(keyone, keytwo) <= 
token('ddd', 'eee')", ConsistencyLevel.ONE);
+        Assert.assertEquals(2, rows.size());
+    }
+
+    @Test
+    public void testRangeDenylisted()
+    {
+        assertThatThrownBy(() -> process("SELECT * FROM " + ks_cql + 
".table1", ConsistencyLevel.ONE))
+                           .isInstanceOf(InvalidRequestException.class)
+                           .hasMessageContaining("Attempted to read a range 
containing 1 denylisted keys");
+    }
+
+    @Test
+    public void testRangeDenylisted2()
+    {
+        assertThatThrownBy(() -> process("SELECT * FROM " + ks_cql + ".table1 
WHERE token(keyone, keytwo) >= token('aaa', 'bbb') and token (keyone, keytwo) 
<= token('bbb', 'ccc')", ConsistencyLevel.ONE))
+                           .isInstanceOf(InvalidRequestException.class)
+                           .hasMessageContaining("Attempted to read a range 
containing 1 denylisted keys");
+    }
+
+    @Test
+    public void testRangeDenylisted3()
+    {
+        assertThatThrownBy(() -> process("SELECT * FROM " + ks_cql + ".table1 
WHERE token(keyone, keytwo) >= token('bbb', 'ccc') and token (keyone, keytwo) 
<= token('ccc', 'ddd')", ConsistencyLevel.ONE))
+                           .isInstanceOf(InvalidRequestException.class)
+                           .hasMessageContaining("Attempted to read a range 
containing 1 denylisted keys");
+    }
+
+    @Test
+    public void testRangeDenylisted4()
+    {
+        assertThatThrownBy(() -> process("SELECT * FROM " + ks_cql + ".table1 
WHERE token(keyone, keytwo) > token('aaa', 'bbb') and token (keyone, keytwo) < 
token('ccc', 'ddd')", ConsistencyLevel.ONE))
+                           .isInstanceOf(InvalidRequestException.class)
+                           .hasMessageContaining("Attempted to read a range 
containing 1 denylisted keys");
+    }
+
+    @Test
+    public void testRangeDenylisted5()
+    {
+        assertThatThrownBy(() -> process("SELECT * FROM " + ks_cql + ".table1 
WHERE token(keyone, keytwo) > token('aaa', 'bbb')", ConsistencyLevel.ONE))
+                           .isInstanceOf(InvalidRequestException.class)
+                           .hasMessageContaining("Attempted to read a range 
containing 1 denylisted keys");
+    }
+
+    @Test
+    public void testRangeDenylisted6()
+    {
+        assertThatThrownBy(() -> process("SELECT * FROM " + ks_cql + ".table1 
WHERE token(keyone, keytwo) < token('ddd', 'eee')", ConsistencyLevel.ONE))
+                           .isInstanceOf(InvalidRequestException.class)
+                           .hasMessageContaining("Attempted to read a range 
containing 1 denylisted keys");
+    }
+
+    @Test
+    public void testInsertUnknownPKIsGraceful()
+    {
+        Assert.assertTrue(StorageProxy.instance.denylistKey(ks_cql, "table1", 
"hohoho"));
+    }
+
+    @Test
+    public void testInsertInvalidTableIsGraceful()
+    {
+        Assert.assertFalse(StorageProxy.instance.denylistKey(ks_cql, 
"asldkfjadlskjf", "alksdjfads"));
+    }
+
+    @Test
+    public void testInsertInvalidKSIsGraceful()
+    {
+        Assert.assertFalse(StorageProxy.instance.denylistKey("asdklfjas", 
"asldkfjadlskjf", "alksdjfads"));
+    }
+
+    @Test
+    public void testDisabledDenylistThrowsNoExceptions()
+    {
+        process(String.format("TRUNCATE TABLE %s.table2", ks_cql), 
ConsistencyLevel.ONE);
+        process(String.format("TRUNCATE TABLE %s.table3", ks_cql), 
ConsistencyLevel.ONE);
+        denyAllKeys();
+        DatabaseDescriptor.setEnablePartitionDenylist(false);
+        process("INSERT INTO " + ks_cql + ".table1 (keyone, keytwo, qux, quz, 
foo) VALUES ('bbb', 'ccc', 'eee', 'fff', 'w')", ConsistencyLevel.ONE);
+        process("SELECT * FROM " + ks_cql + ".table1 WHERE keyone='bbb' and 
keytwo='ccc'", ConsistencyLevel.ONE);
+        process("SELECT * FROM " + ks_cql + ".table1", ConsistencyLevel.ONE);
+
+        for (int i = 0; i < 50; i++)
+        {
+            process(String.format("INSERT INTO %s.table2 (keyone, keytwo, 
keythree, value) VALUES ('%s', '%s', '%s', '%s')", ks_cql, i, i, i, i), 
ConsistencyLevel.ONE);
+            process(String.format("SELECT * FROM %s.table2 WHERE keyone='%s' 
and keytwo='%s'", ks_cql, i, i), ConsistencyLevel.ONE);
+        }
+
+        for (int i = 0; i < 50; i++)
+        {
+            process(String.format("INSERT INTO %s.table3 (keyone, keytwo, 
keythree, value) VALUES ('%s', '%s', '%s', '%s')", ks_cql, i, i, i, i), 
ConsistencyLevel.ONE);
+            process(String.format("SELECT * FROM %s.table3 WHERE keyone='%s' 
and keytwo='%s'", ks_cql, i, i), ConsistencyLevel.ONE);
+        }
+    }
+
+    /**
+     * Want to make sure we don't throw anything or explode when people try to 
remove a key that's not there
+     */
+    @Test
+    public void testRemoveMissingIsGraceful()
+    {
+        confirmDenied("table1", "bbb", "ccc");
+        Assert.assertTrue(removeDenylist(ks_cql, "table1", "bbb:ccc"));
+
+        // We expect this to silently not find and succeed at *trying* to 
remove it
+        Assert.assertTrue(removeDenylist(ks_cql, "table1", "bbb:ccc"));
+        refreshList();
+
+        confirmAllowed("table1", "bbb", "ccc");
+    }
+
+    /**
+     * We need to confirm that the entire cache is reloaded rather than being 
an additive change; we don't want keys to
+     * persist after their removal and reload from CQL.
+     */
+    @Test
+    public void testRemoveWorksOnReload()
+    {
+        denyAllKeys();
+        refreshList();
+
+        confirmDenied("table1", "aaa", "bbb");
+        confirmDenied("table1", "eee", "fff");
+        confirmDenied("table1", "iii", "jjj");
+
+        // poke a hole in the middle and reload
+        removeDenylist(ks_cql, "table1", "eee:fff");
+        refreshList();
+
+        confirmAllowed("table1", "eee", "fff");
+        confirmDenied("table1", "aaa", "bbb");
+        confirmDenied("table1", "iii", "jjj");
+    }
+
+    /**
+     * We go through a few steps here:
+     *  1) Add more keys than we're allowed
+     *  2) Confirm that the overflow keys are *not* denied
+     *  3) Raise the allowable limit
+     *  4) Confirm that the overflow keys are now denied (and no longer really 
"overflow" for that matter)
+     */
+    @Test
+    public void testShrinkAndGrow()
+    {
+        denyAllKeys();
+        refreshList();
+
+        // Initial control; check denial of both initial and final keys
+        confirmDenied("table1", "aaa", "bbb");
+        confirmDenied("table1", "iii", "jjj");
+
+        // Lower our limit to 5 allowable denies and then check and see that 
things past the limit are ignored
+        StorageProxy.instance.setDenylistMaxKeysPerTable(5);
+        StorageProxy.instance.setDenylistMaxKeysTotal(5);
+        refreshList();
+
+        // Confirm overflowed keys are allowed; first come first served
+        confirmDenied("table1", "aaa", "bbb");
+        confirmAllowed("table1", "iii", "jjj");
+
+        // Now we raise the limit back up and do nothing else and confirm it's 
blocked
+        StorageProxy.instance.setDenylistMaxKeysPerTable(1000);
+        StorageProxy.instance.setDenylistMaxKeysTotal(1000);
+        refreshList();
+        confirmDenied("table1", "aaa", "bbb");
+        confirmDenied("table1", "iii", "jjj");
+
+        // Unblock via overflow the table 1 sentinel we'll check in a second
+        StorageProxy.instance.setDenylistMaxKeysPerTable(5);
+        StorageProxy.instance.setDenylistMaxKeysTotal(5);
+        refreshList();
+        confirmAllowed("table1", "iii", "jjj");
+
+        // Now, we remove the denylist entries for our first 5, drop the limit 
back down, and confirm those overflowed keys now block
+        removeDenylist(ks_cql, "table1", "aaa:bbb");
+        removeDenylist(ks_cql, "table1", "bbb:ccc");
+        removeDenylist(ks_cql, "table1", "ccc:ddd");
+        removeDenylist(ks_cql, "table1", "ddd:eee");
+        removeDenylist(ks_cql, "table1", "eee:fff");
+        refreshList();
+        confirmDenied("table1", "iii", "jjj");
+    }
+
+    /*
+    We need to test that, during a violation of our global allowable limit, we 
still enforce our limit of keys queried
+    on a per-table basis.
+     */
+    @Test
+    public void testTableLimitRespected()
+    {
+        StorageProxy.instance.setDenylistMaxKeysPerTable(5);
+        StorageProxy.instance.setDenylistMaxKeysTotal(12);
+        denyAllKeys();
+        refreshList();
+
+        // Table 1: expect first 5 denied
+        confirmDenied("table1", "aaa", "bbb");
+        confirmDenied("table1", "eee", "fff");
+        confirmAllowed("table1", "fff", "ggg");
+
+        // Table 2: expect first 5 denied
+        for (int i = 0; i < 5; i++)
+            confirmDenied("table2", Integer.toString(i), Integer.toString(i));
+
+        // Confirm remainder are allowed because we hit our table limit at 5
+        for (int i = 5; i < 50; i++)
+            confirmAllowed("table2", Integer.toString(i), Integer.toString(i));
+
+        // Table 3: expect only first 2 denied; global limit enforcement
+        confirmDenied("table3", "0", "0");
+        confirmDenied("table3", "1", "1");
+
+        // And our final 48 should be allowed
+        for (int i = 2; i < 50; i++)
+            confirmAllowed("table3", Integer.toString(i), Integer.toString(i));
+    }
+
+    /**
+     * Test that we don't allow overflowing global limit due to accumulation 
of allowable table queries
+     */
+    @Test
+    public void testGlobalLimitRespected()
+    {
+        StorageProxy.instance.setDenylistMaxKeysPerTable(50);
+        StorageProxy.instance.setDenylistMaxKeysTotal(15);
+        denyAllKeys();
+        refreshList();
+
+        // Table 1: expect all 10 denied
+        confirmDenied("table1", "aaa", "bbb");
+        confirmDenied("table1", "jjj", "kkk");
+
+        // Table 2: expect only 5 denied up to global limit trigger
+        for (int i = 0; i < 5; i++)
+            confirmDenied("table2", Integer.toString(i), Integer.toString(i));
+
+        // Remainder of Table 2 should be allowed; testing overflow boundary 
logic
+        for (int i = 5; i < 50; i++)
+            confirmAllowed("table2", Integer.toString(i), Integer.toString(i));
+
+        // Table 3: expect all allowed as we're past global limit by the time 
we got to this table load. This confirms that
+        // we bypass load completely on tables once we're at our global limit.
+        for (int i = 0; i < 50; i++)
+            confirmAllowed("table3", Integer.toString(i), Integer.toString(i));
+    }
+
+    private void confirmDenied(String table, String keyOne, String keyTwo)
+    {
+        String query = String.format("SELECT * FROM " + ks_cql + "." + table + 
" WHERE keyone='%s' and keytwo='%s'", keyOne, keyTwo);
+        assertThatThrownBy(() -> process(query, ConsistencyLevel.ONE))
+                           .isInstanceOf(InvalidRequestException.class)
+                           .hasMessageContaining("Unable to read denylisted 
partition");
+    }
+
+    private void confirmAllowed(String table, String keyOne, String keyTwo)
+    {
+        process(String.format("SELECT * FROM %s.%s WHERE keyone='%s' and 
keytwo='%s'", ks_cql, table, keyOne, keyTwo), ConsistencyLevel.ONE);
+    }
+
+    private void resetDenylist()
+    {
+        process("TRUNCATE system_distributed.partition_denylist", 
ConsistencyLevel.ONE);
+        StorageProxy.instance.setDenylistMaxKeysTotal(1000);
+        StorageProxy.instance.setDenylistMaxKeysPerTable(1000);
+        StorageProxy.instance.loadPartitionDenylist();
+    }
+
+    private void denyAllKeys()
+    {
+        denylist("table1", "aaa:bbb");
+        denylist("table1", "bbb:ccc");
+        denylist("table1", "ccc:ddd");
+        denylist("table1", "ddd:eee");
+        denylist("table1", "eee:fff");
+        denylist("table1", "fff:ggg");
+        denylist("table1", "ggg:hhh");
+        denylist("table1", "hhh:iii");
+        denylist("table1", "iii:jjj");
+        denylist("table1", "jjj:kkk");
+
+        for (int i = 0; i < 50; i++)
+        {
+            denylist("table2", String.format("%d:%d", i, i));
+            denylist("table3", String.format("%d:%d", i, i));
+        }
+
+    }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to