This is an automated email from the ASF dual-hosted git repository.

maedhroz pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d220d24  Request-Based Native Transport Rate-Limiting
d220d24 is described below

commit d220d24994400d4342f5281f1a51514a6ae8c2fd
Author: Caleb Rackliffe <[email protected]>
AuthorDate: Thu Aug 19 10:55:58 2021 -0500

    Request-Based Native Transport Rate-Limiting
    
    patch by Caleb Rackliffe; reviewed by Benedict Elliott Smith and Josh 
McKenzie for CASSANDRA-16663
---
 CHANGES.txt                                        |   1 +
 src/java/org/apache/cassandra/config/Config.java   |   4 +-
 .../cassandra/config/DatabaseDescriptor.java       |  27 ++
 .../cassandra/net/AbstractMessageHandler.java      |  21 +-
 .../org/apache/cassandra/net/FrameDecoder.java     |  13 +-
 .../cassandra/net/InboundMessageHandler.java       |   2 -
 .../org/apache/cassandra/net/ResourceLimits.java   |   4 +-
 .../apache/cassandra/service/StorageService.java   |  24 ++
 .../cassandra/service/StorageServiceMBean.java     |   6 +-
 .../cassandra/transport/CQLMessageHandler.java     | 254 +++++++++++++----
 .../cassandra/transport/ClientResourceLimits.java  |  41 ++-
 .../org/apache/cassandra/transport/Dispatcher.java |  37 ++-
 .../cassandra/transport/ExceptionHandlers.java     |  44 +--
 .../transport/InitialConnectionHandler.java        |   3 +-
 .../apache/cassandra/transport/PreV5Handlers.java  | 186 ++++++++-----
 .../apache/cassandra/transport/SimpleClient.java   |  51 ++--
 .../utils/concurrent/NonBlockingRateLimiter.java   | 188 +++++++++++++
 .../apache/cassandra/transport/BurnTestUtil.java   |   9 +
 .../cassandra/transport/SimpleClientPerfTest.java  |  81 ++++--
 test/unit/org/apache/cassandra/cql3/CQLTester.java |  35 ++-
 .../apache/cassandra/net/ResourceLimitsTest.java   |   2 -
 .../cassandra/transport/CQLConnectionTest.java     |  11 +-
 .../transport/ClientResourceLimitsTest.java        | 101 +++----
 .../cassandra/transport/RateLimitingTest.java      | 309 +++++++++++++++++++++
 .../concurrent/NonBlockingRateLimiterTest.java     | 121 ++++++++
 25 files changed, 1320 insertions(+), 255 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 5ae32a5..275ebc7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.1
+ * Request-Based Native Transport Rate-Limiting (CASSANDRA-16663)
  * Implement nodetool getauditlog command (CASSANDRA-16725)
  * Clean up repair code (CASSANDRA-13720)
  * Background schedule to clean up orphaned hints files (CASSANDRA-16815)
diff --git a/src/java/org/apache/cassandra/config/Config.java 
b/src/java/org/apache/cassandra/config/Config.java
index 9a01db6..f7edda0 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -34,8 +34,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.audit.AuditLogOptions;
-import org.apache.cassandra.fql.FullQueryLoggerOptions;
 import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.fql.FullQueryLoggerOptions;
 
 /**
  * A class that contains configuration properties for the cassandra node it 
runs within.
@@ -193,6 +193,8 @@ public class Config
     public volatile boolean native_transport_allow_older_protocols = true;
     public volatile long 
native_transport_max_concurrent_requests_in_bytes_per_ip = -1L;
     public volatile long native_transport_max_concurrent_requests_in_bytes = 
-1L;
+    public volatile boolean native_transport_rate_limiting_enabled = false;
+    public volatile int native_transport_max_requests_per_second = 1000000;
     public int native_transport_receive_queue_capacity_in_bytes = 1 << 20; // 
1MiB
 
     @Deprecated
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index f2122b1..722e63d 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -540,6 +540,11 @@ public class DatabaseDescriptor
         {
             conf.native_transport_max_concurrent_requests_in_bytes_per_ip = 
Runtime.getRuntime().maxMemory() / 40;
         }
+        
+        if (conf.native_transport_rate_limiting_enabled)
+            logger.info("Native transport rate-limiting enabled at {} 
requests/second.", conf.native_transport_max_requests_per_second);
+        else
+            logger.info("Native transport rate-limiting disabled.");
 
         if (conf.commitlog_total_space_in_mb == null)
         {
@@ -2324,6 +2329,28 @@ public class DatabaseDescriptor
         conf.native_transport_max_concurrent_requests_in_bytes = 
maxConcurrentRequestsInBytes;
     }
 
+    public static int getNativeTransportMaxRequestsPerSecond()
+    {
+        return conf.native_transport_max_requests_per_second;
+    }
+
+    public static void setNativeTransportMaxRequestsPerSecond(int perSecond)
+    {
+        Preconditions.checkArgument(perSecond > 0, 
"native_transport_max_requests_per_second must be greater than zero");
+        conf.native_transport_max_requests_per_second = perSecond;
+    }
+
+    public static void setNativeTransportRateLimitingEnabled(boolean enabled)
+    {
+        logger.info("native_transport_rate_limiting_enabled set to {}", 
enabled);
+        conf.native_transport_rate_limiting_enabled = enabled;
+    }
+
+    public static boolean getNativeTransportRateLimitingEnabled()
+    {
+        return conf.native_transport_rate_limiting_enabled;
+    }
+
     public static int getCommitLogSyncPeriod()
     {
         return conf.commitlog_sync_period_in_ms;
diff --git a/src/java/org/apache/cassandra/net/AbstractMessageHandler.java 
b/src/java/org/apache/cassandra/net/AbstractMessageHandler.java
index d709729..1045f28 100644
--- a/src/java/org/apache/cassandra/net/AbstractMessageHandler.java
+++ b/src/java/org/apache/cassandra/net/AbstractMessageHandler.java
@@ -22,7 +22,6 @@ import java.util.ArrayList;
 import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 
@@ -34,13 +33,13 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.channel.EventLoop;
+import org.apache.cassandra.metrics.ClientMetrics;
 import org.apache.cassandra.net.FrameDecoder.CorruptFrame;
 import org.apache.cassandra.net.FrameDecoder.Frame;
 import org.apache.cassandra.net.FrameDecoder.FrameProcessor;
 import org.apache.cassandra.net.FrameDecoder.IntactFrame;
 import org.apache.cassandra.net.Message.Header;
 import org.apache.cassandra.net.ResourceLimits.Limit;
-import org.apache.cassandra.utils.NoSpamLogger;
 
 import static java.lang.Math.max;
 import static java.lang.Math.min;
@@ -98,8 +97,8 @@ import static 
org.apache.cassandra.utils.MonotonicClock.approxTime;
  * the untouched frames to the correct thread pool for the verb to be 
deserialized there and immediately processed.
  *
  * See {@link LargeMessage} and subclasses for concrete {@link 
AbstractMessageHandler} implementations for details
- * of the large-message accumulating state-machine, and {@link ProcessMessage} 
and its inheritors for the differences
- *in execution.
+ * of the large-message accumulating state-machine, and {@link 
InboundMessageHandler.ProcessMessage} and its inheritors 
+ * for the differences in execution.
  *
  * # Flow control (backpressure)
  *
@@ -135,8 +134,7 @@ import static 
org.apache.cassandra.utils.MonotonicClock.approxTime;
 public abstract class AbstractMessageHandler extends 
ChannelInboundHandlerAdapter implements FrameProcessor
 {
     private static final Logger logger = 
LoggerFactory.getLogger(AbstractMessageHandler.class);
-    private static final NoSpamLogger noSpamLogger = 
NoSpamLogger.getLogger(logger, 1L, TimeUnit.SECONDS);
-
+    
     protected final FrameDecoder decoder;
 
     protected final Channel channel;
@@ -304,12 +302,17 @@ public abstract class AbstractMessageHandler extends 
ChannelInboundHandlerAdapte
         try
         {
             /*
-             * Process up to one message using supplied overriden reserves - 
one of them pre-allocated,
-             * and guaranteed to be enough for one message - then, if no 
obstacles enountered, reactivate
+             * Process up to one message using supplied overridden reserves - 
one of them pre-allocated,
+             * and guaranteed to be enough for one message - then, if no 
obstacles encountered, reactivate
              * the frame decoder using normal reserve capacities.
              */
             if (processUpToOneMessage(endpointReserve, globalReserve))
+            {
                 decoder.reactivate();
+
+                if (decoder.isActive())
+                    ClientMetrics.instance.unpauseConnection();
+            }
         }
         catch (Throwable t)
         {
@@ -321,7 +324,7 @@ public abstract class AbstractMessageHandler extends 
ChannelInboundHandlerAdapte
 
     // return true if the handler should be reactivated - if no new hurdles 
were encountered,
     // like running out of the other kind of reserve capacity
-    private boolean processUpToOneMessage(Limit endpointReserve, Limit 
globalReserve) throws IOException
+    protected boolean processUpToOneMessage(Limit endpointReserve, Limit 
globalReserve) throws IOException
     {
         UpToOneMessageFrameProcessor processor = new 
UpToOneMessageFrameProcessor(endpointReserve, globalReserve);
         decoder.processBacklog(processor);
diff --git a/src/java/org/apache/cassandra/net/FrameDecoder.java 
b/src/java/org/apache/cassandra/net/FrameDecoder.java
index 64e30ef..4cfbf6d 100644
--- a/src/java/org/apache/cassandra/net/FrameDecoder.java
+++ b/src/java/org/apache/cassandra/net/FrameDecoder.java
@@ -191,6 +191,14 @@ public abstract class FrameDecoder extends 
ChannelInboundHandlerAdapter
     abstract void addLastTo(ChannelPipeline pipeline);
 
     /**
+     * @return true if we are actively decoding and processing frames
+     */
+    public boolean isActive()
+    {
+        return isActive;
+    }
+    
+    /**
      * For use by InboundMessageHandler (or other upstream handlers) that want 
to start receiving frames.
      */
     public void activate(FrameProcessor processor)
@@ -208,7 +216,7 @@ public abstract class FrameDecoder extends 
ChannelInboundHandlerAdapter
      * For use by InboundMessageHandler (or other upstream handlers) that want 
to resume
      * receiving frames after previously indicating that processing should be 
paused.
      */
-    void reactivate() throws IOException
+    public void reactivate() throws IOException
     {
         if (isActive)
             throw new IllegalStateException("Tried to reactivate an already 
active FrameDecoder");
@@ -282,7 +290,8 @@ public abstract class FrameDecoder extends 
ChannelInboundHandlerAdapter
     {
         decode(frames, bytes);
 
-        if (isActive) isActive = deliver(processor);
+        if (isActive)
+            isActive = deliver(processor);
     }
 
     @Override
diff --git a/src/java/org/apache/cassandra/net/InboundMessageHandler.java 
b/src/java/org/apache/cassandra/net/InboundMessageHandler.java
index 64d0a8c..f29b3ec 100644
--- a/src/java/org/apache/cassandra/net/InboundMessageHandler.java
+++ b/src/java/org/apache/cassandra/net/InboundMessageHandler.java
@@ -34,8 +34,6 @@ import 
org.apache.cassandra.exceptions.IncompatibleSchemaException;
 import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.Message.Header;
-import org.apache.cassandra.net.FrameDecoder.Frame;
-import org.apache.cassandra.net.FrameDecoder.FrameProcessor;
 import org.apache.cassandra.net.FrameDecoder.IntactFrame;
 import org.apache.cassandra.net.FrameDecoder.CorruptFrame;
 import org.apache.cassandra.net.ResourceLimits.Limit;
diff --git a/src/java/org/apache/cassandra/net/ResourceLimits.java 
b/src/java/org/apache/cassandra/net/ResourceLimits.java
index 8899040..cfddfc3 100644
--- a/src/java/org/apache/cassandra/net/ResourceLimits.java
+++ b/src/java/org/apache/cassandra/net/ResourceLimits.java
@@ -41,7 +41,7 @@ public abstract class ResourceLimits
          * Sets the total amount of permits represented by this {@link Limit} 
- the capacity
          *
          * If the old limit has been reached and the new limit is large enough 
to allow for more
-         * permits to be aqcuired, subsequent calls to {@link #allocate(long)} 
or {@link #tryAllocate(long)}
+         * permits to be acquired, subsequent calls to {@link #allocate(long)} 
or {@link #tryAllocate(long)}
          * will succeed.
          *
          * If the new limit is lower than the current amount of allocated 
permits then subsequent calls
@@ -163,7 +163,7 @@ public abstract class ResourceLimits
                 // possible it would require synchronizing the closing of all 
outbound connections
                 // and reinitializing the Concurrent limit before reopening.  
For such an unlikely path
                 // (previously this was an assert), it is safer to terminate 
the JVM and have something external
-                // restart and get back to a known good state rather than 
intermittendly crashing on any of
+                // restart and get back to a known good state rather than 
intermittently crashing on any of
                 // the connections sharing this limit.
                 throw new UnrecoverableIllegalStateException(
                     "Internode messaging byte limits that are shared between 
connections is invalid (using="+using+")");
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index 396098b..66e1713 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -5875,6 +5875,30 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         ClientResourceLimits.setEndpointLimit(newLimit);
     }
 
+    @Override
+    public int getNativeTransportMaxRequestsPerSecond()
+    {
+        return ClientResourceLimits.getNativeTransportMaxRequestsPerSecond();
+    }
+
+    @Override
+    public void setNativeTransportMaxRequestsPerSecond(int newPerSecond)
+    {
+        
ClientResourceLimits.setNativeTransportMaxRequestsPerSecond(newPerSecond);
+    }
+
+    @Override
+    public void setNativeTransportRateLimitingEnabled(boolean enabled)
+    {
+        DatabaseDescriptor.setNativeTransportRateLimitingEnabled(enabled);
+    }
+
+    @Override
+    public boolean getNativeTransportRateLimitingEnabled()
+    {
+        return DatabaseDescriptor.getNativeTransportRateLimitingEnabled();
+    }
+
     @VisibleForTesting
     public void shutdownServer()
     {
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java 
b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 4622d06..e53eaf9 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -35,7 +35,6 @@ import javax.management.openmbean.TabularData;
 
 import org.apache.cassandra.db.ColumnFamilyStoreMBean;
 import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.locator.InetAddressAndPort;
 
 public interface StorageServiceMBean extends NotificationEmitter
 {
@@ -556,7 +555,10 @@ public interface StorageServiceMBean extends 
NotificationEmitter
     public void setNativeTransportMaxConcurrentRequestsInBytes(long newLimit);
     public long getNativeTransportMaxConcurrentRequestsInBytesPerIp();
     public void setNativeTransportMaxConcurrentRequestsInBytesPerIp(long 
newLimit);
-
+    public int getNativeTransportMaxRequestsPerSecond();
+    public void setNativeTransportMaxRequestsPerSecond(int newPerSecond);
+    public void setNativeTransportRateLimitingEnabled(boolean enabled);
+    public boolean getNativeTransportRateLimitingEnabled();
 
     // allows a node that have been started without joining the ring to join it
     public void joinRing() throws IOException;
diff --git a/src/java/org/apache/cassandra/transport/CQLMessageHandler.java 
b/src/java/org/apache/cassandra/transport/CQLMessageHandler.java
index a9dba8b..bbb8cb5 100644
--- a/src/java/org/apache/cassandra/transport/CQLMessageHandler.java
+++ b/src/java/org/apache/cassandra/transport/CQLMessageHandler.java
@@ -23,6 +23,8 @@ import java.nio.ByteBuffer;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.primitives.Ints;
+import org.apache.cassandra.transport.ClientResourceLimits.Overload;
+import org.apache.cassandra.utils.concurrent.NonBlockingRateLimiter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,7 +44,6 @@ import org.apache.cassandra.net.ResourceLimits.Limit;
 import org.apache.cassandra.net.ShareableBytes;
 import org.apache.cassandra.transport.Flusher.FlushItem.Framed;
 import org.apache.cassandra.transport.messages.ErrorMessage;
-import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.NoSpamLogger;
 
 import static org.apache.cassandra.utils.MonotonicClock.approxTime;
@@ -78,6 +79,7 @@ public class CQLMessageHandler<M extends Message> extends 
AbstractMessageHandler
     private static final NoSpamLogger noSpamLogger = 
NoSpamLogger.getLogger(logger, 1L, TimeUnit.SECONDS);
 
     public static final int LARGE_MESSAGE_THRESHOLD = 
FrameEncoder.Payload.MAX_SIZE - 1;
+    public static final TimeUnit RATE_LIMITER_DELAY_UNIT = 
TimeUnit.NANOSECONDS;
 
     private final Envelope.Decoder envelopeDecoder;
     private final Message.Decoder<M> messageDecoder;
@@ -86,13 +88,14 @@ public class CQLMessageHandler<M extends Message> extends 
AbstractMessageHandler
     private final ErrorHandler errorHandler;
     private final boolean throwOnOverload;
     private final ProtocolVersion version;
+    private final NonBlockingRateLimiter requestRateLimiter;
 
     long channelPayloadBytesInFlight;
     private int consecutiveMessageErrors = 0;
 
     interface MessageConsumer<M extends Message>
     {
-        void accept(Channel channel, M message, Dispatcher.FlushItemConverter 
toFlushItem);
+        void accept(Channel channel, M message, Dispatcher.FlushItemConverter 
toFlushItem, Overload backpressure);
     }
 
     interface ErrorHandler
@@ -129,6 +132,7 @@ public class CQLMessageHandler<M extends Message> extends 
AbstractMessageHandler
         this.errorHandler       = errorHandler;
         this.throwOnOverload    = throwOnOverload;
         this.version            = version;
+        this.requestRateLimiter = resources.requestRateLimiter();
     }
 
     @Override
@@ -139,6 +143,23 @@ public class CQLMessageHandler<M extends Message> extends 
AbstractMessageHandler
         return super.process(frame);
     }
 
+    /**
+     * Checks limits on bytes in flight and the request rate limiter (if 
enabled), then takes one of three actions:
+     * 
+     * 1.) If no limits are breached, process the request.
+     * 2.) If a limit is breached, and the connection is configured to throw 
on overload, throw {@link OverloadedException}.
+     * 3.) If a limit is breached, and the connection is not configurd to 
throw, process the request, and return false
+     *     to let the {@link FrameDecoder} know it should stop processing 
frames.
+     *     
+     * If the connection is configured to throw {@link OverloadedException}, 
requests that breach the rate limit are
+     * not counted against that limit.
+     * 
+     * @return true if the {@link FrameDecoder} should continue to process 
incoming frames, and false if it should stop
+     *         processing them, effectively applying backpressure to clients
+     * 
+     * @throws ErrorMessage.WrappedException with an {@link 
OverloadedException} if overload occurs and the 
+     *         connection is configured to throw on overload
+     */
     protected boolean processOneContainedMessage(ShareableBytes bytes, Limit 
endpointReserve, Limit globalReserve)
     {
         ByteBuffer buf = bytes.get();
@@ -157,41 +178,108 @@ public class CQLMessageHandler<M extends Message> 
extends AbstractMessageHandler
 
         // max CQL message size defaults to 256mb, so should be safe to 
downcast
         int messageSize = Ints.checkedCast(header.bodySizeInBytes);
+        
         if (throwOnOverload)
         {
             if (!acquireCapacity(header, endpointReserve, globalReserve))
             {
-                // discard the request and throw an exception
-                ClientMetrics.instance.markRequestDiscarded();
-                logger.trace("Discarded request of size: {}. 
InflightChannelRequestPayload: {}, " +
-                             "InflightEndpointRequestPayload: {}, 
InflightOverallRequestPayload: {}, Header: {}",
-                             messageSize,
-                             channelPayloadBytesInFlight,
-                             endpointReserve.using(),
-                             globalReserve.using(),
-                             header);
-
-                handleError(new OverloadedException("Server is in overloaded 
state. " +
-                                                    "Cannot accept more 
requests at this point"), header);
-
-                // Don't stop processing incoming messages, rely on the client 
to apply
-                // backpressure when it receives OverloadedException
-                // but discard this message as we're responding with the 
overloaded error
-                incrementReceivedMessageMetrics(messageSize);
-                buf.position(buf.position() + Envelope.Header.LENGTH + 
messageSize);
+                discardAndThrow(endpointReserve, globalReserve, buf, header, 
messageSize, Overload.BYTES_IN_FLIGHT);
+                return true;
+            }
+
+            if (DatabaseDescriptor.getNativeTransportRateLimitingEnabled() && 
!requestRateLimiter.tryReserve())
+            {
+                // We've already allocated against the bytes-in-flight limits, 
so release those resources.
+                release(header);
+                discardAndThrow(endpointReserve, globalReserve, buf, header, 
messageSize, Overload.REQUESTS);
                 return true;
             }
         }
-        else if (!acquireCapacityAndQueueOnFailure(header, endpointReserve, 
globalReserve))
+        else
         {
-            // set backpressure on the channel, queuing the request until we 
have capacity
-            ClientMetrics.instance.pauseConnection();
-            return false;
+            Overload backpressure = Overload.NONE;
+
+            if (!acquireCapacityAndQueueOnFailure(header, endpointReserve, 
globalReserve))
+            {
+                if (processRequestAndUpdateMetrics(bytes, header, messageSize, 
Overload.BYTES_IN_FLIGHT))
+                {
+                    if (decoder.isActive())
+                        ClientMetrics.instance.pauseConnection();
+                }
+
+                backpressure = Overload.BYTES_IN_FLIGHT;
+            }
+            
+            if (DatabaseDescriptor.getNativeTransportRateLimitingEnabled())
+            {
+                // Reserve a permit even if we've already triggered 
backpressure on bytes in flight.
+                long delay = 
requestRateLimiter.reserveAndGetDelay(RATE_LIMITER_DELAY_UNIT);
+                
+                if (backpressure == Overload.NONE && delay > 0)
+                {
+                    if (processRequestAndUpdateMetrics(bytes, header, 
messageSize, Overload.REQUESTS))
+                    {
+                        if (decoder.isActive())
+                            ClientMetrics.instance.pauseConnection();
+
+                        // Schedule a wakup here if we process successfully. 
The connection should be closing otherwise.  
+                        scheduleConnectionWakeupTask(delay, 
RATE_LIMITER_DELAY_UNIT);
+                    }
+                    
+                    backpressure = Overload.REQUESTS;
+                }
+            }
+            
+            // If we triggered backpressure, make sure the caller stops 
processing frames after the request completes.
+            if (backpressure != Overload.NONE)
+                return false;
         }
 
+        return processRequestAndUpdateMetrics(bytes, header, messageSize, 
Overload.NONE);
+    }
+
+    private boolean processRequestAndUpdateMetrics(ShareableBytes bytes, 
Envelope.Header header, int messageSize, Overload backpressure)
+    {
         channelPayloadBytesInFlight += messageSize;
         incrementReceivedMessageMetrics(messageSize);
-        return processRequest(composeRequest(header, bytes));
+        return processRequest(composeRequest(header, bytes), backpressure);
+    }
+
+    private void discardAndThrow(Limit endpointReserve, Limit globalReserve, 
+                                 ByteBuffer buf, Envelope.Header header, int 
messageSize,
+                                 Overload overload)
+    {
+        ClientMetrics.instance.markRequestDiscarded();
+        logOverload(endpointReserve, globalReserve, header, messageSize);
+
+        OverloadedException exception = 
buildOverloadedException(endpointReserve, globalReserve, overload);
+        handleError(exception, header);
+
+        // Don't stop processing incoming messages, as we rely on the client 
to apply
+        // backpressure when it receives OverloadedException, but discard this 
message 
+        // as we're responding with the overloaded error.
+        incrementReceivedMessageMetrics(messageSize);
+        buf.position(buf.position() + Envelope.Header.LENGTH + messageSize);
+    }
+
+    private OverloadedException buildOverloadedException(Limit 
endpointReserve, Limit globalReserve, Overload overload) {
+        return overload == Overload.REQUESTS
+                ? new OverloadedException(String.format("Request breached 
global limit of %d requests/second. Server is " +
+                                                        "currently in an 
overloaded state and cannot accept more requests.", 
+                                                        
requestRateLimiter.getRate()))
+                : new OverloadedException(String.format("Request breached 
limit on bytes in flight. (Endpoint: %d/%d bytes, Global: %d/%d bytes.) " +
+                                                        "Server is currently 
in an overloaded state and cannot accept more requests.",
+                                                        
endpointReserve.using(), endpointReserve.limit(), globalReserve.using(), 
globalReserve.limit()));
+    }
+
+    private void logOverload(Limit endpointReserve, Limit globalReserve, 
Envelope.Header header, int messageSize)
+    {
+        logger.trace("Discarded request of size {} with {} bytes in flight on 
channel. " + 
+                     "Using {}/{} bytes of endpoint limit and {}/{} bytes of 
global limit. " + 
+                     "Global rate limiter: {} Header: {}",
+                     messageSize, channelPayloadBytesInFlight,
+                     endpointReserve.using(), endpointReserve.limit(), 
globalReserve.using(), globalReserve.limit(),
+                     requestRateLimiter, header);
     }
 
     private boolean handleProtocolException(ProtocolException exception,
@@ -251,11 +339,17 @@ public class CQLMessageHandler<M extends Message> extends 
AbstractMessageHandler
 
     protected boolean processRequest(Envelope request)
     {
+        return processRequest(request, Overload.NONE);
+    }
+    
+    protected boolean processRequest(Envelope request, Overload backpressure)
+    {
         M message = null;
         try
         {
             message = messageDecoder.decode(channel, request);
-            dispatcher.accept(channel, message, this::toFlushItem);
+            dispatcher.accept(channel, message, this::toFlushItem, 
backpressure);
+            
             // sucessfully delivered a CQL message to the execution
             // stage, so reset the counter of consecutive errors
             consecutiveMessageErrors = 0;
@@ -293,8 +387,6 @@ public class CQLMessageHandler<M extends Message> extends 
AbstractMessageHandler
      * to the client.
      * This also releases the capacity acquired for processing as
      * indicated by supplied header.
-     * @param t
-     * @param header
      */
     private void handleErrorAndRelease(Throwable t, Envelope.Header header)
     {
@@ -311,8 +403,6 @@ public class CQLMessageHandler<M extends Message> extends 
AbstractMessageHandler
      * when an error occurs without any capacity being acquired.
      * Typically, this would be the result of an acquisition failure
      * if the THROW_ON_OVERLOAD option has been specified by the client.
-     * @param t
-     * @param header
      */
     private void handleError(Throwable t, Envelope.Header header)
     {
@@ -328,8 +418,6 @@ public class CQLMessageHandler<M extends Message> extends 
AbstractMessageHandler
      * when an error occurs without any capacity being acquired.
      * Typically, this would be the result of an acquisition failure
      * if the THROW_ON_OVERLOAD option has been specified by the client.
-     * @param t
-     * @param streamId
      */
     private void handleError(Throwable t, int streamId)
     {
@@ -342,8 +430,6 @@ public class CQLMessageHandler<M extends Message> extends 
AbstractMessageHandler
      * payload fails. This does not attempt to release any resources, as these 
errors
      * should only occur before any capacity acquisition is attempted (e.g. on 
receipt
      * of a corrupt frame, or failure to extract a CQL message from the 
envelope).
-     *
-     * @param t
      */
     private void handleError(Throwable t)
     {
@@ -405,8 +491,9 @@ public class CQLMessageHandler<M extends Message> extends 
AbstractMessageHandler
             // max CQL message size defaults to 256mb, so should be safe to 
downcast
             int messageSize = Ints.checkedCast(header.bodySizeInBytes);
             receivedBytes += buf.remaining();
-
+            
             LargeMessage largeMessage = new LargeMessage(header);
+
             if (!acquireCapacity(header, endpointReserve, globalReserve))
             {
                 // In the case of large messages, never stop processing 
incoming frames
@@ -424,21 +511,49 @@ public class CQLMessageHandler<M extends Message> extends 
AbstractMessageHandler
                 // concurrently.
                 if (throwOnOverload)
                 {
-                    // discard the request and throw an exception
+                    // Mark as overloaded so that discard the message after 
consuming any subsequent frames.
                     ClientMetrics.instance.markRequestDiscarded();
-                    logger.trace("Discarded request of size: {}. 
InflightChannelRequestPayload: {}, " +
-                                 "InflightEndpointRequestPayload: {}, 
InflightOverallRequestPayload: {}, Header: {}",
-                                 messageSize,
-                                 channelPayloadBytesInFlight,
-                                 endpointReserve.using(),
-                                 globalReserve.using(),
-                                 header);
-
-                    // mark as overloaded so that discard the message
-                    // after consuming any subsequent frames
-                    largeMessage.markOverloaded();
+                    logOverload(endpointReserve, globalReserve, header, 
messageSize);
+                    largeMessage.markOverloaded(Overload.BYTES_IN_FLIGHT);
                 }
             }
+            else if 
(DatabaseDescriptor.getNativeTransportRateLimitingEnabled())
+            {
+                if (throwOnOverload)
+                {
+                    if (!requestRateLimiter.tryReserve())
+                    {
+                        ClientMetrics.instance.markRequestDiscarded();
+                        logOverload(endpointReserve, globalReserve, header, 
messageSize);
+                        
+                        // Mark as overloaded so that we discard the message 
after consuming any subsequent frames.
+                        // (i.e. Request resources we may already have 
acquired above will be released.)
+                        largeMessage.markOverloaded(Overload.REQUESTS);
+                        
+                        this.largeMessage = largeMessage;
+                        largeMessage.supply(frame);
+                        return true;
+                    }
+                }
+                else
+                {
+                    long delay = 
requestRateLimiter.reserveAndGetDelay(RATE_LIMITER_DELAY_UNIT);
+
+                    if (delay > 0)
+                    {
+                        this.largeMessage = largeMessage;
+                        largeMessage.markBackpressure(Overload.REQUESTS);
+                        largeMessage.supply(frame);
+
+                        if (decoder.isActive())
+                            ClientMetrics.instance.pauseConnection();
+
+                        scheduleConnectionWakeupTask(delay, 
RATE_LIMITER_DELAY_UNIT);
+                        return false;
+                    }
+                }
+            }
+            
             this.largeMessage = largeMessage;
             largeMessage.supply(frame);
             return true;
@@ -454,6 +569,31 @@ public class CQLMessageHandler<M extends Message> extends 
AbstractMessageHandler
         return channel.id().asShortText();
     }
 
+    private void scheduleConnectionWakeupTask(long waitLength, TimeUnit unit)
+    {
+        channel.eventLoop().schedule(() ->
+                                     {
+                                         try
+                                         {
+                                             // We might have already 
reactivated via another wake task.
+                                             if (!decoder.isActive())
+                                             {
+                                                 decoder.reactivate();
+
+                                                 // Only update the relevant 
metric if we've actually activated.
+                                                 if (decoder.isActive())
+                                                     
ClientMetrics.instance.unpauseConnection();
+                                             }
+                                         }
+                                         catch (Throwable t)
+                                         {
+                                             fatalExceptionCaught(t);
+                                         }
+                                     },
+                                     waitLength,
+                                     unit);
+    }
+
     @SuppressWarnings("BooleanMethodIsAlwaysInverted")
     private boolean acquireCapacityAndQueueOnFailure(Envelope.Header header, 
Limit endpointReserve, Limit globalReserve)
     {
@@ -515,7 +655,8 @@ public class CQLMessageHandler<M extends Message> extends 
AbstractMessageHandler
     {
         private static final long EXPIRES_AT = Long.MAX_VALUE;
 
-        private boolean overloaded = false;
+        private Overload overload = Overload.NONE;
+        private Overload backpressure = Overload.NONE;
 
         private LargeMessage(Envelope.Header header)
         {
@@ -541,19 +682,22 @@ public class CQLMessageHandler<M extends Message> extends 
AbstractMessageHandler
          * so we must ensure that subsequent frames are consumed from the 
channel. At that
          * point an error response is returned to the client, rather than 
processing the message.
          */
-        private void markOverloaded()
+        private void markOverloaded(Overload overload)
+        {
+            this.overload = overload;
+        }
+
+        private void markBackpressure(Overload backpressure)
         {
-            overloaded = true;
+            this.backpressure = backpressure;
         }
 
         protected void onComplete()
         {
-            if (overloaded)
-                handleErrorAndRelease(new OverloadedException("Server is in 
overloaded state. " +
-                                                              "Cannot accept 
more requests at this point"), header);
+            if (overload != Overload.NONE)
+                
handleErrorAndRelease(buildOverloadedException(endpointReserveCapacity, 
globalReserveCapacity, overload), header);
             else if (!isCorrupt)
-                processRequest(assembleFrame());
-
+                processRequest(assembleFrame(), backpressure);
         }
 
         protected void abort()
diff --git a/src/java/org/apache/cassandra/transport/ClientResourceLimits.java 
b/src/java/org/apache/cassandra/transport/ClientResourceLimits.java
index 17a6e59..f9ed692 100644
--- a/src/java/org/apache/cassandra/transport/ClientResourceLimits.java
+++ b/src/java/org/apache/cassandra/transport/ClientResourceLimits.java
@@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.cassandra.utils.concurrent.NonBlockingRateLimiter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,6 +43,10 @@ public class ClientResourceLimits
     private static final AbstractMessageHandler.WaitQueue GLOBAL_QUEUE = 
AbstractMessageHandler.WaitQueue.global(GLOBAL_LIMIT);
     private static final ConcurrentMap<InetAddress, Allocator> 
PER_ENDPOINT_ALLOCATORS = new ConcurrentHashMap<>();
 
+    public static final NonBlockingRateLimiter GLOBAL_REQUEST_LIMITER = new 
NonBlockingRateLimiter(getNativeTransportMaxRequestsPerSecond());
+
+    public enum Overload { NONE, REQUESTS, BYTES_IN_FLIGHT }
+    
     public static Allocator getAllocatorForEndpoint(InetAddress endpoint)
     {
         while (true)
@@ -95,6 +100,19 @@ public class ClientResourceLimits
         return histogram.getSnapshot();
     }
 
+    public static int getNativeTransportMaxRequestsPerSecond()
+    {
+        return DatabaseDescriptor.getNativeTransportMaxRequestsPerSecond();
+    }
+
+    public static void setNativeTransportMaxRequestsPerSecond(int newPerSecond)
+    {
+        int existingPerSecond = getNativeTransportMaxRequestsPerSecond();
+        
DatabaseDescriptor.setNativeTransportMaxRequestsPerSecond(newPerSecond);
+        GLOBAL_REQUEST_LIMITER.setRate(newPerSecond);
+        logger.info("Changed native_transport_max_requests_per_second from {} 
to {}", existingPerSecond, newPerSecond);
+    }
+
     /**
      * This will recompute the ip usage histo on each query of the snapshot 
when requested instead of trying to keep
      * a histogram up to date with each request
@@ -166,8 +184,8 @@ public class ClientResourceLimits
          *
          * @param amount number permits to allocate
          * @return outcome SUCCESS if the allocation was successful. In the 
case of failure,
-         * either INSUFFICIENT_GLOBAL or INSUFFICIENT_ENPOINT to indicate 
which reserve rejected
-         * the allocation request.
+         * either INSUFFICIENT_GLOBAL or INSUFFICIENT_ENDPOINT to indicate 
which 
+         * reserve rejected the allocation request.
          */
         ResourceLimits.Outcome tryAllocate(long amount)
         {
@@ -211,19 +229,18 @@ public class ClientResourceLimits
            return endpointAndGlobal.global().using();
         }
 
+        @Override
         public String toString()
         {
-            return String.format("InflightEndpointRequestPayload: %d/%d, 
InflightOverallRequestPayload: %d/%d",
-                                 endpointAndGlobal.endpoint().using(),
-                                 endpointAndGlobal.endpoint().limit(),
-                                 endpointAndGlobal.global().using(),
-                                 endpointAndGlobal.global().limit());
+            return String.format("Using %d/%d bytes of endpoint limit and 
%d/%d bytes of global limit.",
+                                 endpointAndGlobal.endpoint().using(), 
endpointAndGlobal.endpoint().limit(),
+                                 endpointAndGlobal.global().using(), 
endpointAndGlobal.global().limit());
         }
     }
 
     /**
      * Used in protocol V5 and later by the 
AbstractMessageHandler/CQLMessageHandler hierarchy.
-     * This hides the allocate/tryAllocate/release methods from 
EndpointResourceLimits and exposes
+     * This hides the allocate/tryAllocate/release methods from {@link 
ClientResourceLimits} and exposes
      * the endpoint and global limits, along with their corresponding
      * {@link org.apache.cassandra.net.AbstractMessageHandler.WaitQueue} 
directly.
      * Provided as an interface and single implementation for testing (see 
CQLConnectionTest)
@@ -234,9 +251,10 @@ public class ClientResourceLimits
         AbstractMessageHandler.WaitQueue globalWaitQueue();
         ResourceLimits.Limit endpointLimit();
         AbstractMessageHandler.WaitQueue endpointWaitQueue();
+        NonBlockingRateLimiter requestRateLimiter();
         void release();
 
-        static class Default implements ResourceProvider
+        class Default implements ResourceProvider
         {
             private final Allocator limits;
 
@@ -265,6 +283,11 @@ public class ClientResourceLimits
                 return limits.waitQueue;
             }
 
+            public NonBlockingRateLimiter requestRateLimiter()
+            {
+                return GLOBAL_REQUEST_LIMITER;
+            }
+            
             public void release()
             {
                 limits.release();
diff --git a/src/java/org/apache/cassandra/transport/Dispatcher.java 
b/src/java/org/apache/cassandra/transport/Dispatcher.java
index 05b55e8..9a9cdce 100644
--- a/src/java/org/apache/cassandra/transport/Dispatcher.java
+++ b/src/java/org/apache/cassandra/transport/Dispatcher.java
@@ -20,8 +20,12 @@ package org.apache.cassandra.transport;
 
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import io.netty.channel.Channel;
 import io.netty.channel.EventLoop;
 import io.netty.util.AttributeKey;
@@ -30,15 +34,19 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.net.FrameEncoder;
 import org.apache.cassandra.service.ClientWarn;
 import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.transport.ClientResourceLimits.Overload;
 import org.apache.cassandra.transport.Flusher.FlushItem;
 import org.apache.cassandra.transport.messages.ErrorMessage;
 import org.apache.cassandra.transport.messages.EventMessage;
 import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.NoSpamLogger;
 
 import static org.apache.cassandra.concurrent.SharedExecutorPool.SHARED;
 
 public class Dispatcher
 {
+    private static final Logger logger = 
LoggerFactory.getLogger(Dispatcher.class);
+    
     private static final LocalAwareExecutorService requestExecutor = 
SHARED.newExecutor(DatabaseDescriptor.getNativeTransportMaxThreads(),
                                                                                
         DatabaseDescriptor::setNativeTransportMaxThreads,
                                                                                
         "transport",
@@ -65,20 +73,37 @@ public class Dispatcher
         this.useLegacyFlusher = useLegacyFlusher;
     }
 
-    public void dispatch(Channel channel, Message.Request request, 
FlushItemConverter forFlusher)
+    public void dispatch(Channel channel, Message.Request request, 
FlushItemConverter forFlusher, Overload backpressure)
     {
-        requestExecutor.submit(() -> processRequest(channel, request, 
forFlusher));
+        requestExecutor.submit(() -> processRequest(channel, request, 
forFlusher, backpressure));
     }
 
     /**
      * Note: this method may be executed on the netty event loop, during 
initial protocol negotiation
      */
-    static Message.Response processRequest(ServerConnection connection, 
Message.Request request)
+    static Message.Response processRequest(ServerConnection connection, 
Message.Request request, Overload backpressure)
     {
         long queryStartNanoTime = System.nanoTime();
         if (connection.getVersion().isGreaterOrEqualTo(ProtocolVersion.V4))
             ClientWarn.instance.captureWarnings();
 
+        if (backpressure == Overload.REQUESTS)
+        {
+            String message = String.format("Request breached global limit of 
%d requests/second and triggered backpressure.",
+                                           
ClientResourceLimits.getNativeTransportMaxRequestsPerSecond());
+            
+            NoSpamLogger.log(logger, NoSpamLogger.Level.INFO, 1, 
TimeUnit.MINUTES, message);
+            ClientWarn.instance.warn(message);
+        }
+        else if (backpressure == Overload.BYTES_IN_FLIGHT)
+        {
+            String message = String.format("Request breached limit(s) on bytes 
in flight (Endpoint: %d, Global: %d) and triggered backpressure.",
+                                           
ClientResourceLimits.getEndpointLimit(), ClientResourceLimits.getGlobalLimit());
+            
+            NoSpamLogger.log(logger, NoSpamLogger.Level.INFO, 1, 
TimeUnit.MINUTES, message);
+            ClientWarn.instance.warn(message);
+        }
+
         QueryState qstate = connection.validateNewMessage(request.type, 
connection.getVersion());
 
         Message.logger.trace("Received: {}, v={}", request, 
connection.getVersion());
@@ -94,7 +119,7 @@ public class Dispatcher
     /**
      * Note: this method is not expected to execute on the netty event loop.
      */
-    void processRequest(Channel channel, Message.Request request, 
FlushItemConverter forFlusher)
+    void processRequest(Channel channel, Message.Request request, 
FlushItemConverter forFlusher, Overload backpressure)
     {
         final Message.Response response;
         final ServerConnection connection;
@@ -103,7 +128,7 @@ public class Dispatcher
         {
             assert request.connection() instanceof ServerConnection;
             connection = (ServerConnection) request.connection();
-            response = processRequest(connection, request);
+            response = processRequest(connection, request, backpressure);
             toFlush = forFlusher.toFlushItem(channel, request, response);
             Message.logger.trace("Responding: {}, v={}", response, 
connection.getVersion());
         }
@@ -152,7 +177,7 @@ public class Dispatcher
      * for delivering events to registered clients is dependent on protocol 
version and the configuration
      * of the pipeline. For v5 and newer connections, the event message is 
encoded into an Envelope,
      * wrapped in a FlushItem and then delivered via the pipeline's flusher, 
in a similar way to
-     * a Response returned from {@link #processRequest(Channel, 
Message.Request, FlushItemConverter)}.
+     * a Response returned from {@link #processRequest(Channel, 
Message.Request, FlushItemConverter, Overload)}.
      * It's worth noting that events are not generally fired as a direct 
response to a client request,
      * so this flush item has a null request attribute. The dispatcher itself 
is created when the
      * pipeline is first configured during protocol negotiation and is 
attached to the channel for
diff --git a/src/java/org/apache/cassandra/transport/ExceptionHandlers.java 
b/src/java/org/apache/cassandra/transport/ExceptionHandlers.java
index 8ad30ce..63951e9 100644
--- a/src/java/org/apache/cassandra/transport/ExceptionHandlers.java
+++ b/src/java/org/apache/cassandra/transport/ExceptionHandlers.java
@@ -32,6 +32,7 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.channel.ChannelPromise;
+import org.apache.cassandra.exceptions.OverloadedException;
 import org.apache.cassandra.metrics.ClientMetrics;
 import org.apache.cassandra.net.FrameEncoder;
 import org.apache.cassandra.transport.messages.ErrorMessage;
@@ -87,22 +88,8 @@ public class ExceptionHandlers
                     JVMStabilityInspector.inspectThrowable(cause);
                 }
             }
-            if (Throwables.anyCauseMatches(cause, t -> t instanceof 
ProtocolException))
-            {
-                // if any ProtocolExceptions is not silent, then handle
-                if (Throwables.anyCauseMatches(cause, t -> t instanceof 
ProtocolException && !((ProtocolException) t).isSilent()))
-                {
-                    ClientMetrics.instance.markProtocolException();
-                    // since protocol exceptions are expected to be client 
issues, not logging stack trace
-                    // to avoid spamming the logs once a bad client shows up
-                    NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, 
TimeUnit.MINUTES, "Protocol exception with client networking: " + 
cause.getMessage());
-                }
-            }
-            else
-            {
-                ClientMetrics.instance.markUnknownException();
-                logger.warn("Unknown exception in client networking", cause);
-            }
+            
+            logClientNetworkingExceptions(cause);
         }
 
         private static boolean isFatal(Throwable cause)
@@ -112,6 +99,31 @@ public class ExceptionHandlers
         }
     }
 
+    static void logClientNetworkingExceptions(Throwable cause)
+    {
+        if (Throwables.anyCauseMatches(cause, t -> t instanceof 
ProtocolException))
+        {
+            // if any ProtocolExceptions is not silent, then handle
+            if (Throwables.anyCauseMatches(cause, t -> t instanceof 
ProtocolException && !((ProtocolException) t).isSilent()))
+            {
+                ClientMetrics.instance.markProtocolException();
+                // since protocol exceptions are expected to be client issues, 
not logging stack trace
+                // to avoid spamming the logs once a bad client shows up
+                NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, 
TimeUnit.MINUTES, "Protocol exception with client networking: " + 
cause.getMessage());
+            }
+        }
+        else if (Throwables.anyCauseMatches(cause, t -> t instanceof 
OverloadedException))
+        {
+            // Once the threshold for overload is breached, it will very 
likely spam the logs...
+            NoSpamLogger.log(logger, NoSpamLogger.Level.INFO, 1, 
TimeUnit.MINUTES, cause.getMessage());
+        }
+        else
+        {
+            ClientMetrics.instance.markUnknownException();
+            logger.warn("Unknown exception in client networking", cause);
+        }
+    }
+
     /**
      * Include the channel info in the logged information for unexpected 
errors, and (if {@link #alwaysLogAtError} is
      * false then choose the log level based on the type of exception (some 
are clearly client issues and shouldn't be
diff --git 
a/src/java/org/apache/cassandra/transport/InitialConnectionHandler.java 
b/src/java/org/apache/cassandra/transport/InitialConnectionHandler.java
index 77e9232..f3c050d 100644
--- a/src/java/org/apache/cassandra/transport/InitialConnectionHandler.java
+++ b/src/java/org/apache/cassandra/transport/InitialConnectionHandler.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.cassandra.transport.ClientResourceLimits.Overload;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -147,7 +148,7 @@ public class InitialConnectionHandler extends 
ByteToMessageDecoder
                         promise = new VoidChannelPromise(ctx.channel(), false);
                     }
 
-                    final Message.Response response = 
Dispatcher.processRequest((ServerConnection) connection, startup);
+                    final Message.Response response = 
Dispatcher.processRequest((ServerConnection) connection, startup, 
Overload.NONE);
                     outbound = response.encode(inbound.header.version);
                     ctx.writeAndFlush(outbound, promise);
                     logger.debug("Configured pipeline: {}", ctx.pipeline());
diff --git a/src/java/org/apache/cassandra/transport/PreV5Handlers.java 
b/src/java/org/apache/cassandra/transport/PreV5Handlers.java
index cec8edd..f45028d 100644
--- a/src/java/org/apache/cassandra/transport/PreV5Handlers.java
+++ b/src/java/org/apache/cassandra/transport/PreV5Handlers.java
@@ -19,8 +19,9 @@
 package org.apache.cassandra.transport;
 
 import java.util.List;
-import java.util.concurrent.TimeUnit;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.transport.ClientResourceLimits.Overload;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,10 +40,9 @@ import org.apache.cassandra.metrics.ClientMetrics;
 import org.apache.cassandra.net.ResourceLimits;
 import org.apache.cassandra.transport.messages.ErrorMessage;
 import org.apache.cassandra.utils.JVMStabilityInspector;
-import org.apache.cassandra.utils.NoSpamLogger;
-import org.apache.cassandra.utils.Throwables;
 
-import static org.apache.cassandra.transport.Message.logger;
+import static 
org.apache.cassandra.transport.CQLMessageHandler.RATE_LIMITER_DELAY_UNIT;
+import static 
org.apache.cassandra.transport.ClientResourceLimits.GLOBAL_REQUEST_LIMITER;
 
 public class PreV5Handlers
 {
@@ -63,7 +63,9 @@ public class PreV5Handlers
          * Note: should only be accessed while on the netty event loop.
          */
         private long channelPayloadBytesInFlight;
-        private boolean paused;
+        
+        /** The cause of the current connection pause, or {@link 
Overload#NONE} if it is unpaused. */
+        private Overload backpressure = Overload.NONE;
 
         LegacyDispatchHandler(Dispatcher dispatcher, 
ClientResourceLimits.Allocator endpointPayloadTracker)
         {
@@ -71,11 +73,12 @@ public class PreV5Handlers
             this.endpointPayloadTracker = endpointPayloadTracker;
         }
 
-        protected void channelRead0(ChannelHandlerContext ctx, Message.Request 
request) throws Exception
+        protected void channelRead0(ChannelHandlerContext ctx, Message.Request 
request)
         {
-            // if we decide to handle this message, process it outside of the 
netty event loop
-            if (shouldHandleRequest(ctx, request))
-                dispatcher.dispatch(ctx.channel(), request, this::toFlushItem);
+            // The only reason we won't process this message is if 
checkLimits() throws an OverloadedException.
+            // (i.e. Even if backpressure is applied, the current request is 
allowed to finish.)
+            checkLimits(ctx, request);
+            dispatcher.dispatch(ctx.channel(), request, this::toFlushItem, 
backpressure);
         }
 
         // Acts as a Dispatcher.FlushItemConverter
@@ -95,76 +98,143 @@ public class PreV5Handlers
 
             // since the request has been processed, decrement inflight 
payload at channel, endpoint and global levels
             channelPayloadBytesInFlight -= itemSize;
-            ResourceLimits.Outcome endpointGlobalReleaseOutcome = 
endpointPayloadTracker.release(itemSize);
+            boolean globalInFlightBytesBelowLimit = 
endpointPayloadTracker.release(itemSize) == ResourceLimits.Outcome.BELOW_LIMIT;
 
-            // now check to see if we need to reenable the channel's autoRead.
-            // If the current payload side is zero, we must reenable autoread 
as
+            // Now check to see if we need to reenable the channel's autoRead.
+            //
+            // If the current payload bytes in flight is zero, we must 
reenable autoread as
             // 1) we allow no other thread/channel to do it, and
-            // 2) there's no other events following this one (becuase we're at 
zero bytes in flight),
-            // so no successive to trigger the other clause in this if-block
+            // 2) there are no other events following this one (becuase we're 
at zero bytes in flight),
+            // so no successive to trigger the other clause in this if-block.
+            //
+            // The only exception to this is if the global request rate limit 
has been breached, which means
+            // we'll have to wait until a scheduled wakeup task unpauses the 
connection.
             //
-            // note: this path is only relevant when part of a pre-V5 
pipeline, as only in this case is
+            // Note: This path is only relevant when part of a pre-V5 
pipeline, as only in this case is
             // paused ever set to true. In pipelines configured for V5 or 
later, backpressure and control
             // over the inbound pipeline's autoread status are handled by the 
FrameDecoder/FrameProcessor.
             ChannelConfig config = item.channel.config();
-            if (paused && (channelPayloadBytesInFlight == 0 || 
endpointGlobalReleaseOutcome == ResourceLimits.Outcome.BELOW_LIMIT))
+
+            if (backpressure == Overload.BYTES_IN_FLIGHT && 
(channelPayloadBytesInFlight == 0 || globalInFlightBytesBelowLimit))
             {
-                paused = false;
-                ClientMetrics.instance.unpauseConnection();
-                config.setAutoRead(true);
+                unpauseConnection(config);
             }
         }
 
         /**
-         * This check for inflight payload to potentially discard the request 
should have been ideally in one of the
-         * first handlers in the pipeline (Envelope.Decoder::decode()). 
However, incase of any exception thrown between that
-         * handler (where inflight payload is incremented) and this handler 
(Dispatcher::channelRead0) (where inflight
-         * payload in decremented), inflight payload becomes erroneous. 
ExceptionHandler is not sufficient for this
-         * purpose since it does not have the message envelope associated with 
the exception.
+         * Checks limits on bytes in flight and the request rate limiter (if 
enabled) to determine whether to drop a
+         * request or trigger backpressure and pause the connection.
+         * <p>
+         * The check for inflight payload to potentially discard the request 
should have been ideally in one of the
+         * first handlers in the pipeline (Envelope.Decoder::decode()). 
However, in case of any exception thrown between
+         * that handler (where inflight payload is incremented) and this 
handler (Dispatcher::channelRead0) (where 
+         * inflight payload in decremented), inflight payload becomes 
erroneous. ExceptionHandler is not sufficient for 
+         * this purpose since it does not have the message envelope associated 
with the exception.
+         * <p>
+         * If the connection is configured to throw {@link 
OverloadedException}, requests that breach the rate limit are
+         * not counted against that limit.
          * <p>
          * Note: this method should execute on the netty event loop.
+         * 
+         * @throws ErrorMessage.WrappedException with an {@link 
OverloadedException} if overload occurs and the 
+         *         connection is configured to throw on overload
          */
-        private boolean shouldHandleRequest(ChannelHandlerContext ctx, 
Message.Request request)
+        private void checkLimits(ChannelHandlerContext ctx, Message.Request 
request)
         {
             long requestSize = request.getSource().header.bodySizeInBytes;
-
-            // check for overloaded state by trying to allocate the message 
size from inflight payload trackers
-            if (endpointPayloadTracker.tryAllocate(requestSize) != 
ResourceLimits.Outcome.SUCCESS)
+            
+            if (request.connection.isThrowOnOverload())
             {
-                if (request.connection.isThrowOnOverload())
+                if (endpointPayloadTracker.tryAllocate(requestSize) != 
ResourceLimits.Outcome.SUCCESS)
+                {
+                    discardAndThrow(request, requestSize, 
Overload.BYTES_IN_FLIGHT);
+                }
+
+                if (DatabaseDescriptor.getNativeTransportRateLimitingEnabled() 
&& !GLOBAL_REQUEST_LIMITER.tryReserve())
                 {
-                    // discard the request and throw an exception
-                    ClientMetrics.instance.markRequestDiscarded();
-                    logger.trace("Discarded request of size: {}. 
InflightChannelRequestPayload: {}, {}, Request: {}",
-                                 requestSize,
-                                 channelPayloadBytesInFlight,
-                                 endpointPayloadTracker.toString(),
-                                 request);
-                    throw ErrorMessage.wrap(new OverloadedException("Server is 
in overloaded state. Cannot accept more requests at this point"),
-                                            
request.getSource().header.streamId);
+                    // We've already allocated against the payload tracker 
here, so release those resources.
+                    endpointPayloadTracker.release(requestSize);
+                    discardAndThrow(request, requestSize, Overload.REQUESTS);
                 }
-                else
+            }
+            else
+            {
+                // Any request that gets here will be processed, so increment 
the channel bytes in flight.
+                channelPayloadBytesInFlight += requestSize;
+                
+                // Check for overloaded state by trying to allocate the 
message size from inflight payload trackers
+                if (endpointPayloadTracker.tryAllocate(requestSize) != 
ResourceLimits.Outcome.SUCCESS)
                 {
-                    // set backpressure on the channel, and handle the request
                     endpointPayloadTracker.allocate(requestSize);
-                    ctx.channel().config().setAutoRead(false);
-                    ClientMetrics.instance.pauseConnection();
-                    paused = true;
+                    pauseConnection(ctx);
+                    backpressure = Overload.BYTES_IN_FLIGHT;
+                }
+
+                if (DatabaseDescriptor.getNativeTransportRateLimitingEnabled())
+                {
+                    // Reserve a permit even if we've already triggered 
backpressure on bytes in flight.
+                    long delay = 
GLOBAL_REQUEST_LIMITER.reserveAndGetDelay(RATE_LIMITER_DELAY_UNIT);
+                    
+                    // If we've already triggered backpressure on bytes in 
flight, no further action is necessary.
+                    if (backpressure == Overload.NONE && delay > 0)
+                    {
+                        pauseConnection(ctx);
+                        
+                        // A permit isn't immediately available, so schedule 
an unpause for when it is.
+                        ctx.channel().eventLoop().schedule(() -> 
unpauseConnection(ctx.channel().config()), delay, RATE_LIMITER_DELAY_UNIT);
+                        backpressure = Overload.REQUESTS;
+                    }
                 }
             }
+        }
+
+        private void pauseConnection(ChannelHandlerContext ctx)
+        {
+            if (ctx.channel().config().isAutoRead())
+            {
+                ctx.channel().config().setAutoRead(false);
+                ClientMetrics.instance.pauseConnection();
+            }
+        }
 
-            channelPayloadBytesInFlight += requestSize;
-            return true;
+        private void unpauseConnection(ChannelConfig config)
+        {
+            backpressure = Overload.NONE;
+            
+            if (!config.isAutoRead())
+            {
+                ClientMetrics.instance.unpauseConnection();
+                config.setAutoRead(true);
+            }
         }
 
+        private void discardAndThrow(Message.Request request, long 
requestSize, Overload overload)
+        {
+            ClientMetrics.instance.markRequestDiscarded();
+
+            logger.trace("Discarded request of size {} with {} bytes in flight 
on channel. {} " + 
+                         "Global rate limiter: {} Request: {}",
+                         requestSize, channelPayloadBytesInFlight, 
endpointPayloadTracker,
+                         GLOBAL_REQUEST_LIMITER, request);
+
+            OverloadedException exception = overload == Overload.REQUESTS
+                    ? new OverloadedException(String.format("Request breached 
global limit of %d requests/second. Server is " +
+                                                            "currently in an 
overloaded state and cannot accept more requests.",
+                                                            
GLOBAL_REQUEST_LIMITER.getRate()))
+                    : new OverloadedException(String.format("Request breached 
limit on bytes in flight. (%s)) " +
+                                                            "Server is 
currently in an overloaded state and cannot accept more requests.",
+
+                    endpointPayloadTracker));
+            
+            throw ErrorMessage.wrap(exception, 
request.getSource().header.streamId);
+        }
 
         @Override
         public void channelInactive(ChannelHandlerContext ctx)
         {
             endpointPayloadTracker.release();
-            if (paused)
+            if (!ctx.channel().config().isAutoRead())
             {
-                paused = false;
                 ClientMetrics.instance.unpauseConnection();
             }
             ctx.fireChannelInactive();
@@ -181,7 +251,7 @@ public class PreV5Handlers
         public static final ProtocolDecoder instance = new ProtocolDecoder();
         private ProtocolDecoder(){}
 
-        public void decode(ChannelHandlerContext ctx, Envelope source, List 
results)
+        public void decode(ChannelHandlerContext ctx, Envelope source, 
List<Object> results)
         {
             try
             {
@@ -212,7 +282,7 @@ public class PreV5Handlers
     {
         public static final ProtocolEncoder instance = new ProtocolEncoder();
         private ProtocolEncoder(){}
-        public void encode(ChannelHandlerContext ctx, Message source, List 
results)
+        public void encode(ChannelHandlerContext ctx, Message source, 
List<Object> results)
         {
             ProtocolVersion version = getConnectionVersion(ctx);
             results.add(source.encode(version));
@@ -244,22 +314,8 @@ public class PreV5Handlers
                 if (isFatal(cause))
                     future.addListener((ChannelFutureListener) f -> 
ctx.close());
             }
-            if (Throwables.anyCauseMatches(cause, t -> t instanceof 
ProtocolException))
-            {
-                // if any ProtocolExceptions is not silent, then handle
-                if (Throwables.anyCauseMatches(cause, t -> t instanceof 
ProtocolException && !((ProtocolException) t).isSilent()))
-                {
-                    ClientMetrics.instance.markProtocolException();
-                    // since protocol exceptions are expected to be client 
issues, not logging stack trace
-                    // to avoid spamming the logs once a bad client shows up
-                    NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, 
TimeUnit.MINUTES, "Protocol exception with client networking: " + 
cause.getMessage());
-                }
-            }
-            else
-            {
-                ClientMetrics.instance.markUnknownException();
-                logger.warn("Unknown exception in client networking", cause);
-            }
+            
+            ExceptionHandlers.logClientNetworkingExceptions(cause);
             JVMStabilityInspector.inspectThrowable(cause);
         }
 
diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java 
b/src/java/org/apache/cassandra/transport/SimpleClient.java
index ae89e93..87e54a1 100644
--- a/src/java/org/apache/cassandra/transport/SimpleClient.java
+++ b/src/java/org/apache/cassandra/transport/SimpleClient.java
@@ -27,6 +27,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.primitives.Ints;
+import org.apache.cassandra.transport.ClientResourceLimits.Overload;
+import org.apache.cassandra.utils.concurrent.NonBlockingRateLimiter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,9 +52,13 @@ import org.apache.cassandra.transport.messages.*;
 
 import static org.apache.cassandra.transport.CQLMessageHandler.envelopeSize;
 import static org.apache.cassandra.transport.Flusher.MAX_FRAMED_PAYLOAD_SIZE;
+import static 
org.apache.cassandra.utils.concurrent.NonBlockingRateLimiter.NO_OP_LIMITER;
 
 public class SimpleClient implements Closeable
 {
+
+    public static final int TIMEOUT_SECONDS = 10;
+
     static
     {
         InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory());
@@ -281,7 +287,7 @@ public class SimpleClient implements Closeable
         {
             request.attach(connection);
             lastWriteFuture = 
channel.writeAndFlush(Collections.singletonList(request));
-            Message.Response msg = responseHandler.responses.poll(10, 
TimeUnit.SECONDS);
+            Message.Response msg = 
responseHandler.responses.poll(TIMEOUT_SECONDS, TimeUnit.SECONDS);
             if (msg == null)
                 throw new RuntimeException("timeout");
             if (throwOnErrorResponse && msg instanceof ErrorMessage)
@@ -310,7 +316,7 @@ public class SimpleClient implements Closeable
                 }
                 lastWriteFuture = channel.writeAndFlush(requests);
 
-                long deadline = System.currentTimeMillis() + 
TimeUnit.SECONDS.toMillis(10);
+                long deadline = System.currentTimeMillis() + 
TimeUnit.SECONDS.toMillis(TIMEOUT_SECONDS);
                 for (int i = 0; i < requests.size(); i++)
                 {
                     Message.Response msg = 
responseHandler.responses.poll(deadline - System.currentTimeMillis(), 
TimeUnit.MILLISECONDS);
@@ -393,7 +399,7 @@ public class SimpleClient implements Closeable
                 case AUTHENTICATE:
                     if 
(response.header.version.isGreaterOrEqualTo(ProtocolVersion.V5))
                     {
-                        configureModernPipeline(ctx, response);
+                        configureModernPipeline(ctx, response, 
largeMessageThreshold);
                         // consuming the message is done when setting up the 
pipeline
                     }
                     else
@@ -414,7 +420,7 @@ public class SimpleClient implements Closeable
             }
         }
 
-        private void configureModernPipeline(ChannelHandlerContext ctx, 
Envelope response)
+        private void configureModernPipeline(ChannelHandlerContext ctx, 
Envelope response, int largeMessageThreshold)
         {
             logger.info("Configuring modern pipeline");
             ChannelPipeline pipeline = ctx.pipeline();
@@ -434,7 +440,7 @@ public class SimpleClient implements Closeable
             FrameEncoder frameEncoder = frameEncoder(ctx);
             FrameEncoder.PayloadAllocator payloadAllocator = 
frameEncoder.allocator();
 
-            CQLMessageHandler.MessageConsumer<Message.Response> 
responseConsumer = (c, message, converter) -> {
+            CQLMessageHandler.MessageConsumer<Message.Response> 
responseConsumer = (c, message, converter, backpressured) -> {
                 responseHandler.handleResponse(c, message);
             };
 
@@ -470,6 +476,12 @@ public class SimpleClient implements Closeable
                     return endpointQueue;
                 }
 
+                @Override
+                public NonBlockingRateLimiter requestRateLimiter()
+                {
+                    return NO_OP_LIMITER;
+                }
+
                 public void release()
                 {
                 }
@@ -512,7 +524,7 @@ public class SimpleClient implements Closeable
                     Connection connection = 
ctx.channel().attr(Connection.attributeKey).get();
                     // The only case the connection can be null is when we 
send the initial STARTUP message (client side thus)
                     ProtocolVersion version = connection == null ? 
ProtocolVersion.CURRENT : connection.getVersion();
-                    SimpleFlusher flusher = new SimpleFlusher(frameEncoder);
+                    SimpleFlusher flusher = new SimpleFlusher(frameEncoder, 
largeMessageThreshold);
                     for (Message message : (List<Message>) msg)
                         flusher.enqueue(message.encode(version));
 
@@ -522,7 +534,7 @@ public class SimpleClient implements Closeable
             pipeline.remove(this);
 
             Message.Response message = messageDecoder.decode(ctx.channel(), 
response);
-            responseConsumer.accept(channel, message, (ch, req, resp) -> null);
+            responseConsumer.accept(channel, message, (ch, req, resp) -> null, 
Overload.NONE);
         }
 
         private FrameDecoder frameDecoder(ChannelHandlerContext ctx, 
BufferPoolAllocator allocator)
@@ -669,10 +681,17 @@ public class SimpleClient implements Closeable
         final Queue<Envelope> outbound = new ConcurrentLinkedQueue<>();
         final FrameEncoder frameEncoder;
         private final AtomicBoolean scheduled = new AtomicBoolean(false);
+        private final int largeMessageThreshold;
 
-        SimpleFlusher(FrameEncoder frameEncoder)
+        SimpleFlusher(FrameEncoder frameEncoder, int largeMessageThreshold)
         {
             this.frameEncoder = frameEncoder;
+            this.largeMessageThreshold = largeMessageThreshold;
+        }
+
+        SimpleFlusher(FrameEncoder frameEncoder)
+        {
+            this(frameEncoder, MAX_FRAMED_PAYLOAD_SIZE);
         }
 
         public void enqueue(Envelope message)
@@ -709,14 +728,14 @@ public class SimpleClient implements Closeable
             Envelope f;
             while ((f = outbound.poll()) != null)
             {
-                if (f.header.bodySizeInBytes > MAX_FRAMED_PAYLOAD_SIZE)
+                if (f.header.bodySizeInBytes > largeMessageThreshold)
                 {
                     combiner.addAll(writeLargeMessage(ctx, f));
                 }
                 else
                 {
                     int messageSize = envelopeSize(f.header);
-                    if (bufferSize + messageSize >= MAX_FRAMED_PAYLOAD_SIZE)
+                    if (bufferSize + messageSize >= largeMessageThreshold)
                     {
                         combiner.add(flushBuffer(ctx, buffer, bufferSize));
                         buffer = new ArrayList<>();
@@ -751,9 +770,9 @@ public class SimpleClient implements Closeable
         private FrameEncoder.Payload allocate(int size, boolean selfContained)
         {
             FrameEncoder.Payload payload = frameEncoder.allocator()
-                                                       
.allocate(selfContained, Math.min(size, MAX_FRAMED_PAYLOAD_SIZE));
-            if (size >= MAX_FRAMED_PAYLOAD_SIZE)
-                payload.buffer.limit(MAX_FRAMED_PAYLOAD_SIZE);
+                                                       
.allocate(selfContained, Math.min(size, largeMessageThreshold));
+            if (size >= largeMessageThreshold)
+                payload.buffer.limit(largeMessageThreshold);
 
             return payload;
         }
@@ -766,14 +785,14 @@ public class SimpleClient implements Closeable
             boolean firstFrame = true;
             while (f.body.readableBytes() > 0 || firstFrame)
             {
-                int payloadSize = Math.min(f.body.readableBytes(), 
MAX_FRAMED_PAYLOAD_SIZE);
+                int payloadSize = Math.min(f.body.readableBytes(), 
largeMessageThreshold);
                 payload = allocate(f.body.readableBytes(), false);
 
                 buf = payload.buffer;
                 // BufferPool may give us a buffer larger than we asked for.
                 // FrameEncoder may object if buffer.remaining is >= MAX_SIZE.
-                if (payloadSize >= MAX_FRAMED_PAYLOAD_SIZE)
-                    buf.limit(MAX_FRAMED_PAYLOAD_SIZE);
+                if (payloadSize >= largeMessageThreshold)
+                    buf.limit(largeMessageThreshold);
 
                 if (firstFrame)
                 {
diff --git 
a/src/java/org/apache/cassandra/utils/concurrent/NonBlockingRateLimiter.java 
b/src/java/org/apache/cassandra/utils/concurrent/NonBlockingRateLimiter.java
new file mode 100644
index 0000000..2a98222
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/concurrent/NonBlockingRateLimiter.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils.concurrent;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Ticker;
+
+import javax.annotation.concurrent.ThreadSafe;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.lang.Math.toIntExact;
+
+/**
+ * A rate limiter implementation that allows callers to reserve permits that 
may only be available 
+ * in the future, delegating to them decisions about how to schedule/delay 
work and whether or not
+ * to block execution to do so.
+ */
+@SuppressWarnings("UnstableApiUsage")
+@ThreadSafe
+public class NonBlockingRateLimiter
+{
+    public static final long NANOS_PER_SECOND = TimeUnit.SECONDS.toNanos(1);
+    static final long DEFAULT_BURST_NANOS = NANOS_PER_SECOND;
+
+    /** a starting time for elapsed time calculations */
+    private final long startedNanos;
+
+    /** nanoseconds from start time corresponding to the next available permit 
*/
+    private final AtomicLong nextAvailable = new AtomicLong();
+    
+    private volatile Ticker ticker;
+    
+    private volatile int permitsPerSecond;
+    
+    /** time in nanoseconds between permits on the timeline */
+    private volatile long intervalNanos;
+
+    /**
+     * To allow the limiter to more closely adhere to the configured rate in 
the face of
+     * unevenly distributed permits requests, it will allow a number of 
permits equal to
+     * burstNanos / intervalNanos to be issued in a "burst" before reaching a 
steady state.
+     * 
+     * Another way to think about this is that it allows us to bring forward 
the permits
+     * from short periods of inactivity. This is especially useful when the 
upstream user
+     * of the limiter delays request processing mechanism contains overhead 
that is longer
+     * than the intervalNanos in duration.
+     */
+    private final long burstNanos;
+
+    public NonBlockingRateLimiter(int permitsPerSecond)
+    {
+        this(permitsPerSecond, DEFAULT_BURST_NANOS, Ticker.systemTicker());
+    }
+
+    @VisibleForTesting
+    public NonBlockingRateLimiter(int permitsPerSecond, long burstNanos, 
Ticker ticker)
+    {
+        this.startedNanos = ticker.read();
+        this.burstNanos = burstNanos;
+        setRate(permitsPerSecond, ticker);
+    }
+
+    public void setRate(int permitsPerSecond)
+    {
+        setRate(permitsPerSecond, Ticker.systemTicker());
+    }
+
+    @VisibleForTesting
+    public synchronized void setRate(int permitsPerSecond, Ticker ticker)
+    {
+        Preconditions.checkArgument(permitsPerSecond > 0, "permits/second must 
be positive");
+        Preconditions.checkArgument(permitsPerSecond <= NANOS_PER_SECOND, 
"permits/second cannot be greater than " + NANOS_PER_SECOND);
+
+        this.ticker = ticker;
+        this.permitsPerSecond = permitsPerSecond;
+        intervalNanos = NANOS_PER_SECOND / permitsPerSecond;
+        nextAvailable.set(nanosElapsed());
+    }
+
+    /**
+     * @return the number of available permits per second
+     */
+    public int getRate()
+    {
+        return permitsPerSecond;
+    }
+
+    /**
+     * Reserves a single permit slot on the timeline which may not yet be 
available.
+     *
+     * @return time until the reserved permit will be available (or zero if it 
already is) in the specified units
+     */
+    public long reserveAndGetDelay(TimeUnit delayUnit)
+    {
+        long nowNanos = nanosElapsed();
+
+        for (;;)
+        {
+            long prev = nextAvailable.get();
+            long interval = this.intervalNanos;
+
+            // Push the first available permit slot up to the burst window if 
necessary.
+            long firstAvailable = Math.max(prev, nowNanos - burstNanos);
+
+            // Advance the configured interval starting from the bounded 
previous permit slot.
+            if (nextAvailable.compareAndSet(prev, firstAvailable + interval))
+                // If the time now is before the first available slot, return 
the delay.  
+                return delayUnit.convert(Math.max(0,  firstAvailable - 
nowNanos), TimeUnit.NANOSECONDS);
+        }
+    }
+
+    /**
+     * Reserves a single permit slot on the timeline, but only if one is 
available.
+     *
+     * @return true if a permit is available, false if one is not
+     */
+    public boolean tryReserve()
+    {
+        long nowNanos = nanosElapsed();
+    
+        for (;;)
+        {
+            long prev = nextAvailable.get();
+            long interval = this.intervalNanos;
+    
+            // Push the first available permit slot up to the burst window if 
necessary.
+            long firstAvailable = Math.max(prev, nowNanos - burstNanos);
+            
+            // If we haven't reached the time for the first available permit, 
we've failed to reserve. 
+            if (nowNanos < firstAvailable)
+                return false;
+    
+            // Advance the configured interval starting from the bounded 
previous permit slot.
+            // If another thread has already taken the next slot, retry.
+            if (nextAvailable.compareAndSet(prev, firstAvailable + interval))
+                return true;
+        }
+    }
+
+    @VisibleForTesting
+    public long getIntervalNanos()
+    {
+        return intervalNanos;
+    }
+    
+    @VisibleForTesting
+    public long getStartedNanos()
+    {
+        return startedNanos;
+    }
+
+    private long nanosElapsed()
+    {
+        return ticker.read() - startedNanos;
+    }
+
+    public static final NonBlockingRateLimiter NO_OP_LIMITER = new 
NonBlockingRateLimiter(toIntExact(NANOS_PER_SECOND))
+    {
+        @Override
+        public long reserveAndGetDelay(TimeUnit delayUnit) {
+            return 0;
+        }
+
+        @Override
+        public boolean tryReserve()
+        {
+            return true;
+        }
+    };
+}
diff --git a/test/burn/org/apache/cassandra/transport/BurnTestUtil.java 
b/test/burn/org/apache/cassandra/transport/BurnTestUtil.java
index 9610455..c8017d1 100644
--- a/test/burn/org/apache/cassandra/transport/BurnTestUtil.java
+++ b/test/burn/org/apache/cassandra/transport/BurnTestUtil.java
@@ -35,6 +35,9 @@ import org.apache.cassandra.net.AbstractMessageHandler;
 import org.apache.cassandra.net.ResourceLimits;
 import org.apache.cassandra.transport.messages.QueryMessage;
 import org.apache.cassandra.transport.messages.ResultMessage;
+import org.apache.cassandra.utils.concurrent.NonBlockingRateLimiter;
+
+import static 
org.apache.cassandra.utils.concurrent.NonBlockingRateLimiter.NO_OP_LIMITER;
 
 public class BurnTestUtil
 {
@@ -143,6 +146,12 @@ public class BurnTestUtil
                     return delegate.endpointWaitQueue();
                 }
 
+                @Override
+                public NonBlockingRateLimiter requestRateLimiter()
+                {
+                    return NO_OP_LIMITER;
+                }
+                
                 public void release()
                 {
                     delegate.release();
diff --git a/test/burn/org/apache/cassandra/transport/SimpleClientPerfTest.java 
b/test/burn/org/apache/cassandra/transport/SimpleClientPerfTest.java
index 20b8cb3..0427ad9 100644
--- a/test/burn/org/apache/cassandra/transport/SimpleClientPerfTest.java
+++ b/test/burn/org/apache/cassandra/transport/SimpleClientPerfTest.java
@@ -23,12 +23,14 @@ import java.net.ServerSocket;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 
-import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
+import com.google.common.util.concurrent.RateLimiter;
+import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -41,11 +43,13 @@ import org.apache.cassandra.auth.AllowAllAuthorizer;
 import org.apache.cassandra.auth.AllowAllNetworkAuthorizer;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.EncryptionOptions;
+import org.apache.cassandra.exceptions.OverloadedException;
 import org.apache.cassandra.metrics.ClientMetrics;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.transport.messages.QueryMessage;
 import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.utils.AssertUtil;
+import org.apache.cassandra.utils.Throwables;
 
 import static org.apache.cassandra.transport.BurnTestUtil.SizeCaps;
 import static org.apache.cassandra.transport.BurnTestUtil.generateQueryMessage;
@@ -57,7 +61,7 @@ public class SimpleClientPerfTest
     @Parameterized.Parameter
     public ProtocolVersion version;
 
-    @Parameterized.Parameters()
+    @Parameterized.Parameters(name="{0}")
     public static Collection<Object[]> versions()
     {
         return ProtocolVersion.SUPPORTED.stream()
@@ -90,6 +94,7 @@ public class SimpleClientPerfTest
         }
     }
 
+    @SuppressWarnings({"IOResourceOpenedButNotSafelyClosed", "resource"})
     @Test
     public void measureSmall() throws Throwable
     {
@@ -102,6 +107,7 @@ public class SimpleClientPerfTest
                  version);
     }
 
+    @SuppressWarnings({"IOResourceOpenedButNotSafelyClosed", "resource"})
     @Test
     public void measureSmallWithCompression() throws Throwable
     {
@@ -114,6 +120,7 @@ public class SimpleClientPerfTest
                  version);
     }
 
+    @SuppressWarnings({"IOResourceOpenedButNotSafelyClosed", "resource"})
     @Test
     public void measureLarge() throws Throwable
     {
@@ -126,6 +133,7 @@ public class SimpleClientPerfTest
                  version);
     }
 
+    @SuppressWarnings({"IOResourceOpenedButNotSafelyClosed", "resource"})
     @Test
     public void measureLargeWithCompression() throws Throwable
     {
@@ -138,6 +146,7 @@ public class SimpleClientPerfTest
                  version);
     }
 
+    @SuppressWarnings({"UnstableApiUsage", "UseOfSystemOutOrSystemErr", 
"ResultOfMethodCallIgnored"})
     public void perfTest(SizeCaps requestCaps, SizeCaps responseCaps, 
AssertUtil.ThrowingSupplier<SimpleClient> clientSupplier, ProtocolVersion 
version) throws Throwable
     {
         ResultMessage.Rows response = generateRows(0, responseCaps);
@@ -190,7 +199,9 @@ public class SimpleClientPerfTest
         AtomicBoolean measure = new AtomicBoolean(false);
         DescriptiveStatistics stats = new DescriptiveStatistics();
         Lock lock = new ReentrantLock();
-
+        RateLimiter limiter = RateLimiter.create(2000);
+        AtomicLong overloadedExceptions = new AtomicLong(0);
+        
         // TODO: exercise client -> server large messages
         for (int t = 0; t < threads; t++)
         {
@@ -203,24 +214,53 @@ public class SimpleClientPerfTest
                         for (int j = 0; j < 1; j++)
                             messages.add(requestMessage);
 
-                        if (measure.get())
-                        {
-                            long nanoStart = System.nanoTime();
-                            client.execute(messages);
-                            long diff = System.nanoTime() - nanoStart;
-
-                            lock.lock();
-                            try
+                            if (measure.get())
                             {
-                                
stats.addValue(TimeUnit.MICROSECONDS.toMillis(diff));
+                                try
+                                {
+                                    limiter.acquire();
+                                    long nanoStart = System.nanoTime();
+                                    client.execute(messages);
+                                    long elapsed = System.nanoTime() - 
nanoStart;
+
+                                    lock.lock();
+                                    try
+                                    {
+                                        
stats.addValue(TimeUnit.NANOSECONDS.toMicros(elapsed));
+                                    }
+                                    finally
+                                    {
+                                        lock.unlock();
+                                    }
+                                }
+                                catch (RuntimeException e)
+                                {
+                                    if (Throwables.anyCauseMatches(e, cause -> 
cause instanceof OverloadedException))
+                                    {
+                                        overloadedExceptions.incrementAndGet();
+                                    }
+                                    else
+                                    {
+                                        throw e;
+                                    }
+                                }
                             }
-                            finally
+                            else
                             {
-                                lock.unlock();
+                                try
+                                {
+                                    limiter.acquire();
+                                    client.execute(messages); // warm-up
+                                }
+                                catch (RuntimeException e)
+                                {
+                                    // Ignore overloads during warmup...
+                                    if (!Throwables.anyCauseMatches(e, cause 
-> cause instanceof OverloadedException))
+                                    {
+                                        throw e;
+                                    }
+                                }
                             }
-                        }
-                        else
-                            client.execute(messages); // warm-up
                     }
                 }
                 catch (Throwable e)
@@ -240,14 +280,19 @@ public class SimpleClientPerfTest
 
         System.out.println("requestSize = " + requestSize);
         System.out.println("responseSize = " + responseSize);
+
+        System.out.println("Latencies (in microseconds)");
+        System.out.println("Elements: " + stats.getN());
         System.out.println("Mean:     " + stats.getMean());
         System.out.println("Variance: " + stats.getVariance());
         System.out.println("Median:   " + stats.getPercentile(0.5));
         System.out.println("90p:      " + stats.getPercentile(0.90));
         System.out.println("95p:      " + stats.getPercentile(0.95));
         System.out.println("99p:      " + stats.getPercentile(0.99));
+        System.out.println("Max:      " + stats.getMax());
+        
+        System.out.println("Failed due to overload: " + 
overloadedExceptions.get());
 
         server.stop();
     }
 }
-
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java 
b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index 59968fe..f3c279c 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -50,6 +50,7 @@ import org.junit.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.codahale.metrics.Gauge;
 import com.datastax.driver.core.*;
 import com.datastax.driver.core.DataType;
 import com.datastax.driver.core.ResultSet;
@@ -63,6 +64,7 @@ import org.apache.cassandra.index.SecondaryIndexManager;
 import org.apache.cassandra.config.EncryptionOptions;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.metrics.CassandraMetricsRegistry;
 import org.apache.cassandra.metrics.ClientMetrics;
 import org.apache.cassandra.schema.*;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -90,7 +92,10 @@ import org.apache.cassandra.utils.JMXServerUtils;
 
 import static 
com.datastax.driver.core.SocketOptions.DEFAULT_CONNECT_TIMEOUT_MILLIS;
 import static 
com.datastax.driver.core.SocketOptions.DEFAULT_READ_TIMEOUT_MILLIS;
-import static junit.framework.Assert.assertNotNull;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * Base class for CQL tests.
@@ -941,6 +946,23 @@ public abstract class CQLTester
         Assert.assertEquals(expectedArgTypes != null ? 
Arrays.asList(expectedArgTypes) : null, schemaChange.argTypes);
     }
 
+    protected static void assertWarningsContain(Message.Response response, 
String message)
+    {
+        List<String> warnings = response.getWarnings();
+        Assert.assertNotNull(warnings);
+        assertTrue(warnings.stream().anyMatch(s -> s.contains(message)));
+    }
+
+    protected static void assertNoWarningContains(Message.Response response, 
String message)
+    {
+        List<String> warnings = response.getWarnings();
+        
+        if (warnings != null) 
+        {
+            assertFalse(warnings.stream().anyMatch(s -> s.contains(message)));
+        }
+    }
+
     protected static ResultMessage schemaChange(String query)
     {
         try
@@ -1806,6 +1828,17 @@ public abstract class CQLTester
         return clusters.get(protocolVersion).getMetadata().newTupleType(types);
     }
 
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    protected static Gauge<Integer> getPausedConnectionsGauge()
+    {
+        String metricName = 
"org.apache.cassandra.metrics.Client.PausedConnections";
+        Map<String, Gauge> metrics = 
CassandraMetricsRegistry.Metrics.getGauges((name, metric) -> 
name.equals(metricName));
+        if (metrics.size() != 1)
+            fail(String.format("Expected a single registered metric for paused 
client connections, found %s",
+                               metrics.size()));
+        return metrics.get(metricName);
+    }
+
     // Attempt to find an AbstracType from a value (for serialization/printing 
sake).
     // Will work as long as we use types we know of, which is good enough for 
testing
     private static AbstractType typeFor(Object value)
diff --git a/test/unit/org/apache/cassandra/net/ResourceLimitsTest.java 
b/test/unit/org/apache/cassandra/net/ResourceLimitsTest.java
index f2f8a01..5c2ecbe 100644
--- a/test/unit/org/apache/cassandra/net/ResourceLimitsTest.java
+++ b/test/unit/org/apache/cassandra/net/ResourceLimitsTest.java
@@ -23,9 +23,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.function.LongFunction;
 
-import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.KillerForTests;
 import org.junit.Assert;
diff --git a/test/unit/org/apache/cassandra/transport/CQLConnectionTest.java 
b/test/unit/org/apache/cassandra/transport/CQLConnectionTest.java
index 96f28a2..73950ae 100644
--- a/test/unit/org/apache/cassandra/transport/CQLConnectionTest.java
+++ b/test/unit/org/apache/cassandra/transport/CQLConnectionTest.java
@@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.*;
 
+import org.apache.cassandra.transport.ClientResourceLimits.Overload;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -55,10 +56,12 @@ import 
org.apache.cassandra.transport.CQLMessageHandler.MessageConsumer;
 import org.apache.cassandra.transport.messages.*;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.concurrent.SimpleCondition;
+import org.apache.cassandra.utils.concurrent.NonBlockingRateLimiter;
 
 import static 
org.apache.cassandra.config.EncryptionOptions.TlsEncryptionPolicy.UNENCRYPTED;
 import static org.apache.cassandra.net.FramingTest.randomishBytes;
 import static org.apache.cassandra.transport.Flusher.MAX_FRAMED_PAYLOAD_SIZE;
+import static 
org.apache.cassandra.utils.concurrent.NonBlockingRateLimiter.NO_OP_LIMITER;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -622,7 +625,7 @@ public class CQLConnectionTest
             this.frameEncoder = frameEncoder;
         }
 
-        public void accept(Channel channel, Message.Request message, 
Dispatcher.FlushItemConverter toFlushItem)
+        public void accept(Channel channel, Message.Request message, 
Dispatcher.FlushItemConverter toFlushItem, Overload backpressure)
         {
             if (flusher == null)
                 flusher = new SimpleClient.SimpleFlusher(frameEncoder);
@@ -755,6 +758,12 @@ public class CQLConnectionTest
                     return delegate.endpointWaitQueue();
                 }
 
+                @Override
+                public NonBlockingRateLimiter requestRateLimiter()
+                {
+                    return NO_OP_LIMITER;
+                }
+                
                 public void release()
                 {
                     delegate.release();
diff --git 
a/test/unit/org/apache/cassandra/transport/ClientResourceLimitsTest.java 
b/test/unit/org/apache/cassandra/transport/ClientResourceLimitsTest.java
index 9cc900a..5cea90c 100644
--- a/test/unit/org/apache/cassandra/transport/ClientResourceLimitsTest.java
+++ b/test/unit/org/apache/cassandra/transport/ClientResourceLimitsTest.java
@@ -19,7 +19,6 @@
 package org.apache.cassandra.transport;
 
 import java.io.IOException;
-import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.TimeUnit;
@@ -29,9 +28,9 @@ import java.util.function.Supplier;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.primitives.Ints;
+import org.apache.cassandra.service.StorageService;
 import org.junit.*;
 
-import com.codahale.metrics.Gauge;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.cql3.QueryOptions;
@@ -40,13 +39,11 @@ import org.apache.cassandra.db.marshal.Int32Type;
 import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.db.virtual.*;
 import org.apache.cassandra.exceptions.OverloadedException;
-import org.apache.cassandra.metrics.CassandraMetricsRegistry;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.transport.messages.QueryMessage;
 import org.apache.cassandra.utils.FBUtilities;
 import org.awaitility.Awaitility;
 
-import static org.apache.cassandra.Util.spinAssertEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
@@ -55,19 +52,18 @@ import static org.junit.Assert.fail;
 
 public class ClientResourceLimitsTest extends CQLTester
 {
-
     private static final long LOW_LIMIT = 600L;
     private static final long HIGH_LIMIT = 5000000000L;
 
-    private static final QueryOptions V5_DEFAULT_OPTIONS = QueryOptions.create(
-        QueryOptions.DEFAULT.getConsistency(),
-        QueryOptions.DEFAULT.getValues(),
-        QueryOptions.DEFAULT.skipMetadata(),
-        QueryOptions.DEFAULT.getPageSize(),
-        QueryOptions.DEFAULT.getPagingState(),
-        QueryOptions.DEFAULT.getSerialConsistency(),
-        ProtocolVersion.V5,
-        KEYSPACE);
+    private static final QueryOptions V5_DEFAULT_OPTIONS = 
+        QueryOptions.create(QueryOptions.DEFAULT.getConsistency(),
+                            QueryOptions.DEFAULT.getValues(),
+                            QueryOptions.DEFAULT.skipMetadata(),
+                            QueryOptions.DEFAULT.getPageSize(),
+                            QueryOptions.DEFAULT.getPagingState(),
+                            QueryOptions.DEFAULT.getSerialConsistency(),
+                            ProtocolVersion.V5,
+                            KEYSPACE);
 
     @BeforeClass
     public static void setUp()
@@ -75,7 +71,7 @@ public class ClientResourceLimitsTest extends CQLTester
         DatabaseDescriptor.setNativeTransportReceiveQueueCapacityInBytes(1);
         
DatabaseDescriptor.setNativeTransportMaxConcurrentRequestsInBytesPerIp(LOW_LIMIT);
         
DatabaseDescriptor.setNativeTransportMaxConcurrentRequestsInBytes(LOW_LIMIT);
-
+        
         // The driver control connections would send queries that might 
interfere with the tests.
         requireNetworkWithoutDriver();
     }
@@ -124,7 +120,7 @@ public class ClientResourceLimitsTest extends CQLTester
         }
     }
 
-    @SuppressWarnings("resource")
+    @SuppressWarnings({"resource", "SameParameterValue"})
     private SimpleClient client(boolean throwOnOverload, int 
largeMessageThreshold)
     {
         try
@@ -184,27 +180,34 @@ public class ClientResourceLimitsTest extends CQLTester
                          (provider) -> provider.endpointWaitQueue().signal());
     }
 
-    private void backPressureTest(Runnable limitLifter, 
Consumer<ClientResourceLimits.ResourceProvider> signaller)
-    throws Throwable
+    private void backPressureTest(Runnable limitLifter, 
Consumer<ClientResourceLimits.ResourceProvider> signaller) throws Throwable
     {
         final AtomicReference<Exception> error = new AtomicReference<>();
         final CountDownLatch started = new CountDownLatch(1);
         final CountDownLatch complete = new CountDownLatch(1);
-        try(SimpleClient client = client(false))
+        
+        try (SimpleClient client = client(false))
         {
-            QueryMessage queryMessage = new QueryMessage("CREATE TABLE atable 
(pk int PRIMARY KEY, v text)",
-                                                         V5_DEFAULT_OPTIONS);
-            client.execute(queryMessage);
-
-            // There should be no paused client connections yet
-            Gauge<Integer> pausedConnections = getPausedConnectionsGauge();
-            int before = pausedConnections.getValue();
-
+            // The first query does not trigger backpressure/pause the 
connection:
+            QueryMessage queryMessage = 
+                    new QueryMessage("CREATE TABLE atable (pk int PRIMARY KEY, 
v text)", V5_DEFAULT_OPTIONS);
+            Message.Response belowThresholdResponse = 
client.execute(queryMessage);
+            assertEquals(0, getPausedConnectionsGauge().getValue().intValue());
+            assertNoWarningContains(belowThresholdResponse, "bytes in flight");
+            
+            // A second query triggers backpressure but is allowed to 
complete...
+            Message.Response aboveThresholdResponse = 
client.execute(queryMessage());
+            assertEquals(1, getPausedConnectionsGauge().getValue().intValue());
+            assertWarningsContain(aboveThresholdResponse, "bytes in flight");
+
+            // ...and a third request is paused.
+            final AtomicReference<Message.Response> response = new 
AtomicReference<>();
+            
             Thread t = new Thread(() -> {
                 try
                 {
                     started.countDown();
-                    client.execute(queryMessage());
+                    response.set(client.execute(queryMessage()));
                     complete.countDown();
                 }
                 catch (Exception e)
@@ -216,19 +219,13 @@ public class ClientResourceLimitsTest extends CQLTester
             });
             t.start();
 
-            // When the client attempts to execute the second query, the 
backpressure
-            // mechanism should report the client connection is paused
-            assertTrue(started.await(5, TimeUnit.SECONDS));
-            spinAssertEquals("Timed out after waiting 5 seconds for paused " +
-                             "connections metric to increment due to 
backpressure",
-                             before + 1, pausedConnections::getValue, 5, 
TimeUnit.SECONDS);
-
             // verify the request hasn't completed
             assertFalse(complete.await(1, TimeUnit.SECONDS));
 
             // backpressure has been applied, if we increase the limits of the 
exhausted reserve and signal
             // the appropriate WaitQueue, it should be released and the client 
request will complete
             limitLifter.run();
+            
             // We need a ResourceProvider to get access to the WaitQueue
             ClientResourceLimits.Allocator allocator = 
ClientResourceLimits.getAllocatorForEndpoint(FBUtilities.getJustLocalAddress());
             ClientResourceLimits.ResourceProvider queueHandle = new 
ClientResourceLimits.ResourceProvider.Default(allocator);
@@ -237,8 +234,10 @@ public class ClientResourceLimitsTest extends CQLTester
             // SimpleClient has a 10 second timeout, so if we have to wait
             // longer than that assume that we're not going to receive a
             // reply. If all's well, the completion should happen immediately
-            assertTrue(complete.await(11, TimeUnit.SECONDS));
+            assertTrue(complete.await(SimpleClient.TIMEOUT_SECONDS + 1, 
TimeUnit.SECONDS));
             assertNull(error.get());
+            assertEquals(0, getPausedConnectionsGauge().getValue().intValue());
+            assertNoWarningContains(response.get(), "bytes in flight");
         }
     }
 
@@ -304,17 +303,6 @@ public class ClientResourceLimitsTest extends CQLTester
         return new QueryMessage(query.toString(), V5_DEFAULT_OPTIONS);
     }
 
-    @SuppressWarnings({"rawtypes", "unchecked"})
-    private Gauge<Integer> getPausedConnectionsGauge()
-    {
-        String metricName = 
"org.apache.cassandra.metrics.Client.PausedConnections";
-        Map<String, Gauge> metrics = 
CassandraMetricsRegistry.Metrics.getGauges((name, metric) -> 
name.equals(metricName));
-        if (metrics.size() != 1)
-            fail(String.format("Expected a single registered metric for paused 
client connections, found %s",
-                               metrics.size()));
-        return metrics.get(metricName);
-    }
-
     @Test
     public void testQueryUpdatesConcurrentMetricsUpdate() throws Throwable
     {
@@ -469,4 +457,23 @@ public class ClientResourceLimitsTest extends CQLTester
             client.close();
         }
     }
+
+    @Test
+    public void shouldChangeRequestsPerSecondAtRuntime()
+    {
+        StorageService.instance.setNativeTransportMaxRequestsPerSecond(100);
+        assertEquals(100, 
ClientResourceLimits.getNativeTransportMaxRequestsPerSecond(), 0);
+        assertEquals(100, 
ClientResourceLimits.GLOBAL_REQUEST_LIMITER.getRate(), 0);
+        assertEquals(100, 
StorageService.instance.getNativeTransportMaxRequestsPerSecond());
+
+        StorageService.instance.setNativeTransportMaxRequestsPerSecond(1000);
+        assertEquals(1000, 
ClientResourceLimits.getNativeTransportMaxRequestsPerSecond(), 0);
+        assertEquals(1000, 
ClientResourceLimits.GLOBAL_REQUEST_LIMITER.getRate(), 0);
+        assertEquals(1000, 
StorageService.instance.getNativeTransportMaxRequestsPerSecond());
+
+        StorageService.instance.setNativeTransportMaxRequestsPerSecond(500);
+        assertEquals(500, 
ClientResourceLimits.getNativeTransportMaxRequestsPerSecond(), 0);
+        assertEquals(500, 
ClientResourceLimits.GLOBAL_REQUEST_LIMITER.getRate(), 0);
+        assertEquals(500, 
StorageService.instance.getNativeTransportMaxRequestsPerSecond());
+    }
 }
diff --git a/test/unit/org/apache/cassandra/transport/RateLimitingTest.java 
b/test/unit/org/apache/cassandra/transport/RateLimitingTest.java
new file mode 100644
index 0000000..8497c01
--- /dev/null
+++ b/test/unit/org/apache/cassandra/transport/RateLimitingTest.java
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.transport;
+
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import com.google.common.base.Ticker;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.service.StorageService;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.exceptions.OverloadedException;
+import org.apache.cassandra.transport.messages.QueryMessage;
+import org.apache.cassandra.utils.Throwables;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import static org.apache.cassandra.Util.spinAssertEquals;
+import static org.apache.cassandra.transport.ProtocolVersion.V4;
+
+@SuppressWarnings("UnstableApiUsage")
+@RunWith(Parameterized.class)
+public class RateLimitingTest extends CQLTester
+{
+    public static final String BACKPRESSURE_WARNING_SNIPPET = "Request 
breached global limit";
+    
+    private static final int LARGE_PAYLOAD_THRESHOLD_BYTES = 1000;
+    private static final int OVERLOAD_PERMITS_PER_SECOND = 1;
+
+    @Parameterized.Parameter
+    public ProtocolVersion version;
+
+    @Parameterized.Parameters(name="{0}")
+    public static Collection<Object[]> versions()
+    {
+        return ProtocolVersion.SUPPORTED.stream()
+                                        .map(v -> new Object[]{v})
+                                        .collect(Collectors.toList());
+    }
+
+    private AtomicLong tick;
+    private Ticker ticker;
+
+    @BeforeClass
+    public static void setup()
+    {
+        // If we don't exceed the queue capacity, we won't actually use the 
global/endpoint 
+        // bytes-in-flight limits, and the assertions we make below around 
releasing them would be useless.
+        DatabaseDescriptor.setNativeTransportReceiveQueueCapacityInBytes(1);
+        
+        // The driver control connections would send queries that might 
interfere with the tests.
+        requireNetworkWithoutDriver();
+    }
+
+    @Before
+    public void resetLimits()
+    {
+        // Reset to the original start time in case a test advances the clock.
+        tick = new 
AtomicLong(ClientResourceLimits.GLOBAL_REQUEST_LIMITER.getStartedNanos());
+
+        ticker = new Ticker()
+        {
+            @Override
+            public long read()
+            {
+                return tick.get();
+            }
+        };
+
+        ClientResourceLimits.setGlobalLimit(Long.MAX_VALUE);
+    }
+
+    @Test
+    public void shouldThrowOnOverloadSmallMessages() throws Exception
+    {
+        int payloadSize = LARGE_PAYLOAD_THRESHOLD_BYTES / 4;
+        testOverload(payloadSize, true);
+    }
+
+    @Test
+    public void shouldThrowOnOverloadLargeMessages() throws Exception
+    {
+        int payloadSize = LARGE_PAYLOAD_THRESHOLD_BYTES * 2;
+        testOverload(payloadSize, true);
+    }
+
+    @Test
+    public void shouldBackpressureSmallMessages() throws Exception
+    {
+        int payloadSize = LARGE_PAYLOAD_THRESHOLD_BYTES / 4;
+        testOverload(payloadSize, false);
+    }
+
+    @Test
+    public void shouldBackpressureLargeMessages() throws Exception
+    {
+        int payloadSize = LARGE_PAYLOAD_THRESHOLD_BYTES * 2;
+        testOverload(payloadSize, false);
+    }
+
+    @Test
+    public void shouldReleaseSmallMessageOnBytesInFlightOverload() throws 
Exception
+    {
+        testBytesInFlightOverload(LARGE_PAYLOAD_THRESHOLD_BYTES / 4);
+    }
+
+    @Test
+    public void shouldReleaseLargeMessageOnBytesInFlightOverload() throws 
Exception
+    {
+        testBytesInFlightOverload(LARGE_PAYLOAD_THRESHOLD_BYTES * 2);
+    }
+
+    private void testBytesInFlightOverload(int payloadSize) throws Exception
+    {
+        try (SimpleClient client = client().connect(false, true))
+        {
+            
StorageService.instance.setNativeTransportRateLimitingEnabled(false);
+            QueryMessage queryMessage = new QueryMessage("CREATE TABLE IF NOT 
EXISTS " + KEYSPACE + ".atable (pk int PRIMARY KEY, v text)", queryOptions());
+            client.execute(queryMessage);
+
+            
StorageService.instance.setNativeTransportRateLimitingEnabled(true);
+            
ClientResourceLimits.GLOBAL_REQUEST_LIMITER.setRate(OVERLOAD_PERMITS_PER_SECOND,
 ticker);
+            ClientResourceLimits.setGlobalLimit(1);
+
+            try
+            {
+                // The first query takes the one available permit, but should 
fail on the bytes in flight limit.
+                client.execute(queryMessage(payloadSize));
+            }
+            catch (RuntimeException e)
+            {
+                assertTrue(Throwables.anyCauseMatches(e, cause -> cause 
instanceof OverloadedException));
+            }
+        }
+        finally
+        {
+            // Sanity check bytes in flight limiter.
+            assertEquals(0, ClientResourceLimits.getCurrentGlobalUsage());
+            
StorageService.instance.setNativeTransportRateLimitingEnabled(false);
+        }
+    }
+
+    private void testOverload(int payloadSize, boolean throwOnOverload) throws 
Exception
+    {
+        try (SimpleClient client = client().connect(false, throwOnOverload))
+        {
+            
StorageService.instance.setNativeTransportRateLimitingEnabled(false);
+            QueryMessage queryMessage = new QueryMessage("CREATE TABLE IF NOT 
EXISTS " + KEYSPACE + ".atable (pk int PRIMARY KEY, v text)", queryOptions());
+            client.execute(queryMessage);
+
+            
StorageService.instance.setNativeTransportRateLimitingEnabled(true);
+            
ClientResourceLimits.GLOBAL_REQUEST_LIMITER.setRate(OVERLOAD_PERMITS_PER_SECOND,
 ticker);
+
+            if (throwOnOverload)
+                testThrowOnOverload(payloadSize, client);
+            else
+            {
+                testBackpressureOnOverload(payloadSize, client);
+            }   
+        }
+        finally
+        {
+            // Sanity the check bytes in flight limiter.
+            assertEquals(0, ClientResourceLimits.getCurrentGlobalUsage());
+            
StorageService.instance.setNativeTransportRateLimitingEnabled(false);
+        }
+    }
+
+    private void testBackpressureOnOverload(int payloadSize, SimpleClient 
client) throws Exception
+    {
+        // The first query takes the one available permit.
+        Message.Response firstResponse = 
client.execute(queryMessage(payloadSize));
+        assertEquals(0, getPausedConnectionsGauge().getValue().intValue());
+        assertNoWarningContains(firstResponse, BACKPRESSURE_WARNING_SNIPPET);
+        
+        // The second query activates backpressure.
+        long overloadQueryStartTime = System.currentTimeMillis();
+        Message.Response response = client.execute(queryMessage(payloadSize));
+
+        // V3 does not support client warnings, but otherwise we should get 
one for this query.
+        if (version.isGreaterOrEqualTo(V4))
+            assertWarningsContain(response, BACKPRESSURE_WARNING_SNIPPET);
+
+        AtomicReference<Throwable> error = new AtomicReference<>();
+        CountDownLatch started = new CountDownLatch(1);
+        CountDownLatch complete = new CountDownLatch(1);
+        AtomicReference<Message.Response> pausedQueryResponse = new 
AtomicReference<>();
+        
+        Thread queryRunner = new Thread(() ->
+        {
+            try
+            {
+                started.countDown();
+                
pausedQueryResponse.set(client.execute(queryMessage(payloadSize)));
+                complete.countDown();
+            }
+            catch (Throwable t)
+            {
+                error.set(t);
+            }
+        });
+
+        // Advance the rater limiter so that this query will see an available 
permit. This also
+        // means it should not produce a client warning, which we verify below.
+        // (Note that we advance 2 intervals for the 2 prior queries.)
+        tick.addAndGet(2 * 
ClientResourceLimits.GLOBAL_REQUEST_LIMITER.getIntervalNanos());
+        
+        queryRunner.start();
+
+        // ...and the request should complete without error.
+        assertTrue(complete.await(SimpleClient.TIMEOUT_SECONDS + 1, 
TimeUnit.SECONDS));
+        assertNull(error.get());
+        assertNoWarningContains(pausedQueryResponse.get(), 
BACKPRESSURE_WARNING_SNIPPET);
+
+        // At least the number of milliseconds in the permit interval should 
already have elapsed 
+        // since the start of the query that pauses the connection.
+        double permitIntervalMillis = (double) TimeUnit.SECONDS.toMillis(1L) / 
OVERLOAD_PERMITS_PER_SECOND;
+        long sinceQueryStarted = System.currentTimeMillis() - 
overloadQueryStartTime;
+        long remainingMillis = ((long) permitIntervalMillis) - 
sinceQueryStarted;
+        assertTrue("Query completed before connection unpause!", 
remainingMillis <= 0);
+        
+        spinAssertEquals("Timed out after waiting 5 seconds for paused 
connections metric to normalize.",
+                         0, () -> getPausedConnectionsGauge().getValue(), 5, 
TimeUnit.SECONDS);
+    }
+
+    private void testThrowOnOverload(int payloadSize, SimpleClient client)
+    {
+        // The first query takes the one available permit...
+        client.execute(queryMessage(payloadSize));
+        
+        try
+        {   
+            // ...and the second breaches the limit....
+            client.execute(queryMessage(payloadSize));
+        }
+        catch (RuntimeException e)
+        {
+            assertTrue(Throwables.anyCauseMatches(e, cause -> cause instanceof 
OverloadedException));
+        }
+
+        // Advance the timeline and verify that we can take a permit again.
+        // (Note that we don't take one when we throw on overload.)
+        
tick.addAndGet(ClientResourceLimits.GLOBAL_REQUEST_LIMITER.getIntervalNanos());
+        client.execute(queryMessage(payloadSize));
+    }
+
+    private QueryMessage queryMessage(int length)
+    {
+        StringBuilder query = new StringBuilder("INSERT INTO " + KEYSPACE + 
".atable (pk, v) VALUES (1, '");
+        
+        for (int i = 0; i < length; i++)
+        {
+            query.append('a');
+        }
+        
+        query.append("')");
+        return new QueryMessage(query.toString(), queryOptions());
+    }
+
+    private SimpleClient client()
+    {
+        return SimpleClient.builder(nativeAddr.getHostAddress(), nativePort)
+                           .protocolVersion(version)
+                           .useBeta()
+                           
.largeMessageThreshold(LARGE_PAYLOAD_THRESHOLD_BYTES)
+                           .build();
+    }
+
+    private QueryOptions queryOptions()
+    {
+        return QueryOptions.create(QueryOptions.DEFAULT.getConsistency(),
+                                   QueryOptions.DEFAULT.getValues(),
+                                   QueryOptions.DEFAULT.skipMetadata(),
+                                   QueryOptions.DEFAULT.getPageSize(),
+                                   QueryOptions.DEFAULT.getPagingState(),
+                                   QueryOptions.DEFAULT.getSerialConsistency(),
+                                   version,
+                                   KEYSPACE);
+    }
+}
diff --git 
a/test/unit/org/apache/cassandra/utils/concurrent/NonBlockingRateLimiterTest.java
 
b/test/unit/org/apache/cassandra/utils/concurrent/NonBlockingRateLimiterTest.java
new file mode 100644
index 0000000..d63cef3
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/utils/concurrent/NonBlockingRateLimiterTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils.concurrent;
+
+import com.google.common.base.Ticker;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@SuppressWarnings("UnstableApiUsage")
+public class NonBlockingRateLimiterTest
+{
+    private static final AtomicLong CLOCK = new AtomicLong(0);
+    private static final TimeUnit DELAY_UNIT = TimeUnit.NANOSECONDS;
+
+    private static final Ticker TICKER = new Ticker()
+    {
+        @Override
+        public long read() {
+            return CLOCK.get();
+        }
+    };
+
+    @Before
+    public void resetTicker()
+    {
+        CLOCK.set(0);
+    }
+
+    @Test
+    public void testUnconditionalReservation()
+    {
+        NonBlockingRateLimiter limiter = new NonBlockingRateLimiter(4, 0, 
TICKER);
+        long oneSecond = DELAY_UNIT.convert(1, TimeUnit.SECONDS);
+        long oneDelay = oneSecond / 4;
+
+        // Delays should begin accumulating without any ticker movement...
+        assertEquals(0, limiter.reserveAndGetDelay(DELAY_UNIT));
+        assertEquals(oneDelay, limiter.reserveAndGetDelay(DELAY_UNIT));
+        assertEquals(oneDelay * 2, limiter.reserveAndGetDelay(DELAY_UNIT));
+        assertEquals(oneDelay * 3, limiter.reserveAndGetDelay(DELAY_UNIT));
+
+        // ...but should be gone after advancing enough to free up a permit.
+        CLOCK.addAndGet(NonBlockingRateLimiter.NANOS_PER_SECOND);
+        assertEquals(0, limiter.reserveAndGetDelay(DELAY_UNIT));
+    }
+
+    @Test
+    public void testConditionalReservation()
+    {
+        NonBlockingRateLimiter limiter = new NonBlockingRateLimiter(1, 0, 
TICKER);
+        
+        // Take the available permit, but then fail a subsequent attempt.
+        assertTrue(limiter.tryReserve());
+        assertFalse(limiter.tryReserve());
+
+        // We only need to advance one second, as the second attempt should 
not get a permit.
+        CLOCK.addAndGet(NonBlockingRateLimiter.NANOS_PER_SECOND);
+        assertTrue(limiter.tryReserve());
+    }
+
+    @Test
+    public void testBurstPermitConsumption()
+    {
+        // Create a limiter that produces 2 permits/second and allows 1-second 
bursts.
+        NonBlockingRateLimiter limiter = new NonBlockingRateLimiter(1, 
NonBlockingRateLimiter.DEFAULT_BURST_NANOS, TICKER);
+
+        // Advance the clock to create a 1-second idle period, which makes one 
burst permit available.
+        CLOCK.addAndGet(NonBlockingRateLimiter.NANOS_PER_SECOND);
+        
+        // Take the burst permit.
+        assertTrue(limiter.tryReserve());
+        
+        // Take the "normal" permit.
+        assertTrue(limiter.tryReserve());
+        
+        // Then fail, as we've consumed both.
+        assertFalse(limiter.tryReserve());
+
+        // Advance 1 interval again...
+        CLOCK.addAndGet(NonBlockingRateLimiter.NANOS_PER_SECOND);
+
+        // ...and only one permit should be available, as we've reached a 
steady state.
+        assertTrue(limiter.tryReserve());
+        assertFalse(limiter.tryReserve());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testMaximumRate()
+    {
+        new NonBlockingRateLimiter(Integer.MAX_VALUE, 0, 
Ticker.systemTicker());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testMinimumRate()
+    {
+        new NonBlockingRateLimiter(-1, 0, Ticker.systemTicker());
+    }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to