This is an automated email from the ASF dual-hosted git repository.
maedhroz pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new d220d24 Request-Based Native Transport Rate-Limiting
d220d24 is described below
commit d220d24994400d4342f5281f1a51514a6ae8c2fd
Author: Caleb Rackliffe <[email protected]>
AuthorDate: Thu Aug 19 10:55:58 2021 -0500
Request-Based Native Transport Rate-Limiting
patch by Caleb Rackliffe; reviewed by Benedict Elliott Smith and Josh
McKenzie for CASSANDRA-16663
---
CHANGES.txt | 1 +
src/java/org/apache/cassandra/config/Config.java | 4 +-
.../cassandra/config/DatabaseDescriptor.java | 27 ++
.../cassandra/net/AbstractMessageHandler.java | 21 +-
.../org/apache/cassandra/net/FrameDecoder.java | 13 +-
.../cassandra/net/InboundMessageHandler.java | 2 -
.../org/apache/cassandra/net/ResourceLimits.java | 4 +-
.../apache/cassandra/service/StorageService.java | 24 ++
.../cassandra/service/StorageServiceMBean.java | 6 +-
.../cassandra/transport/CQLMessageHandler.java | 254 +++++++++++++----
.../cassandra/transport/ClientResourceLimits.java | 41 ++-
.../org/apache/cassandra/transport/Dispatcher.java | 37 ++-
.../cassandra/transport/ExceptionHandlers.java | 44 +--
.../transport/InitialConnectionHandler.java | 3 +-
.../apache/cassandra/transport/PreV5Handlers.java | 186 ++++++++-----
.../apache/cassandra/transport/SimpleClient.java | 51 ++--
.../utils/concurrent/NonBlockingRateLimiter.java | 188 +++++++++++++
.../apache/cassandra/transport/BurnTestUtil.java | 9 +
.../cassandra/transport/SimpleClientPerfTest.java | 81 ++++--
test/unit/org/apache/cassandra/cql3/CQLTester.java | 35 ++-
.../apache/cassandra/net/ResourceLimitsTest.java | 2 -
.../cassandra/transport/CQLConnectionTest.java | 11 +-
.../transport/ClientResourceLimitsTest.java | 101 +++----
.../cassandra/transport/RateLimitingTest.java | 309 +++++++++++++++++++++
.../concurrent/NonBlockingRateLimiterTest.java | 121 ++++++++
25 files changed, 1320 insertions(+), 255 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 5ae32a5..275ebc7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.1
+ * Request-Based Native Transport Rate-Limiting (CASSANDRA-16663)
* Implement nodetool getauditlog command (CASSANDRA-16725)
* Clean up repair code (CASSANDRA-13720)
* Background schedule to clean up orphaned hints files (CASSANDRA-16815)
diff --git a/src/java/org/apache/cassandra/config/Config.java
b/src/java/org/apache/cassandra/config/Config.java
index 9a01db6..f7edda0 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -34,8 +34,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.audit.AuditLogOptions;
-import org.apache.cassandra.fql.FullQueryLoggerOptions;
import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.fql.FullQueryLoggerOptions;
/**
* A class that contains configuration properties for the cassandra node it
runs within.
@@ -193,6 +193,8 @@ public class Config
public volatile boolean native_transport_allow_older_protocols = true;
public volatile long
native_transport_max_concurrent_requests_in_bytes_per_ip = -1L;
public volatile long native_transport_max_concurrent_requests_in_bytes =
-1L;
+ public volatile boolean native_transport_rate_limiting_enabled = false;
+ public volatile int native_transport_max_requests_per_second = 1000000;
public int native_transport_receive_queue_capacity_in_bytes = 1 << 20; //
1MiB
@Deprecated
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index f2122b1..722e63d 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -540,6 +540,11 @@ public class DatabaseDescriptor
{
conf.native_transport_max_concurrent_requests_in_bytes_per_ip =
Runtime.getRuntime().maxMemory() / 40;
}
+
+ if (conf.native_transport_rate_limiting_enabled)
+ logger.info("Native transport rate-limiting enabled at {}
requests/second.", conf.native_transport_max_requests_per_second);
+ else
+ logger.info("Native transport rate-limiting disabled.");
if (conf.commitlog_total_space_in_mb == null)
{
@@ -2324,6 +2329,28 @@ public class DatabaseDescriptor
conf.native_transport_max_concurrent_requests_in_bytes =
maxConcurrentRequestsInBytes;
}
+ public static int getNativeTransportMaxRequestsPerSecond()
+ {
+ return conf.native_transport_max_requests_per_second;
+ }
+
+ public static void setNativeTransportMaxRequestsPerSecond(int perSecond)
+ {
+ Preconditions.checkArgument(perSecond > 0,
"native_transport_max_requests_per_second must be greater than zero");
+ conf.native_transport_max_requests_per_second = perSecond;
+ }
+
+ public static void setNativeTransportRateLimitingEnabled(boolean enabled)
+ {
+ logger.info("native_transport_rate_limiting_enabled set to {}",
enabled);
+ conf.native_transport_rate_limiting_enabled = enabled;
+ }
+
+ public static boolean getNativeTransportRateLimitingEnabled()
+ {
+ return conf.native_transport_rate_limiting_enabled;
+ }
+
public static int getCommitLogSyncPeriod()
{
return conf.commitlog_sync_period_in_ms;
diff --git a/src/java/org/apache/cassandra/net/AbstractMessageHandler.java
b/src/java/org/apache/cassandra/net/AbstractMessageHandler.java
index d709729..1045f28 100644
--- a/src/java/org/apache/cassandra/net/AbstractMessageHandler.java
+++ b/src/java/org/apache/cassandra/net/AbstractMessageHandler.java
@@ -22,7 +22,6 @@ import java.util.ArrayList;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
@@ -34,13 +33,13 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.EventLoop;
+import org.apache.cassandra.metrics.ClientMetrics;
import org.apache.cassandra.net.FrameDecoder.CorruptFrame;
import org.apache.cassandra.net.FrameDecoder.Frame;
import org.apache.cassandra.net.FrameDecoder.FrameProcessor;
import org.apache.cassandra.net.FrameDecoder.IntactFrame;
import org.apache.cassandra.net.Message.Header;
import org.apache.cassandra.net.ResourceLimits.Limit;
-import org.apache.cassandra.utils.NoSpamLogger;
import static java.lang.Math.max;
import static java.lang.Math.min;
@@ -98,8 +97,8 @@ import static
org.apache.cassandra.utils.MonotonicClock.approxTime;
* the untouched frames to the correct thread pool for the verb to be
deserialized there and immediately processed.
*
* See {@link LargeMessage} and subclasses for concrete {@link
AbstractMessageHandler} implementations for details
- * of the large-message accumulating state-machine, and {@link ProcessMessage}
and its inheritors for the differences
- *in execution.
+ * of the large-message accumulating state-machine, and {@link
InboundMessageHandler.ProcessMessage} and its inheritors
+ * for the differences in execution.
*
* # Flow control (backpressure)
*
@@ -135,8 +134,7 @@ import static
org.apache.cassandra.utils.MonotonicClock.approxTime;
public abstract class AbstractMessageHandler extends
ChannelInboundHandlerAdapter implements FrameProcessor
{
private static final Logger logger =
LoggerFactory.getLogger(AbstractMessageHandler.class);
- private static final NoSpamLogger noSpamLogger =
NoSpamLogger.getLogger(logger, 1L, TimeUnit.SECONDS);
-
+
protected final FrameDecoder decoder;
protected final Channel channel;
@@ -304,12 +302,17 @@ public abstract class AbstractMessageHandler extends
ChannelInboundHandlerAdapte
try
{
/*
- * Process up to one message using supplied overriden reserves -
one of them pre-allocated,
- * and guaranteed to be enough for one message - then, if no
obstacles enountered, reactivate
+ * Process up to one message using supplied overridden reserves -
one of them pre-allocated,
+ * and guaranteed to be enough for one message - then, if no
obstacles encountered, reactivate
* the frame decoder using normal reserve capacities.
*/
if (processUpToOneMessage(endpointReserve, globalReserve))
+ {
decoder.reactivate();
+
+ if (decoder.isActive())
+ ClientMetrics.instance.unpauseConnection();
+ }
}
catch (Throwable t)
{
@@ -321,7 +324,7 @@ public abstract class AbstractMessageHandler extends
ChannelInboundHandlerAdapte
// return true if the handler should be reactivated - if no new hurdles
were encountered,
// like running out of the other kind of reserve capacity
- private boolean processUpToOneMessage(Limit endpointReserve, Limit
globalReserve) throws IOException
+ protected boolean processUpToOneMessage(Limit endpointReserve, Limit
globalReserve) throws IOException
{
UpToOneMessageFrameProcessor processor = new
UpToOneMessageFrameProcessor(endpointReserve, globalReserve);
decoder.processBacklog(processor);
diff --git a/src/java/org/apache/cassandra/net/FrameDecoder.java
b/src/java/org/apache/cassandra/net/FrameDecoder.java
index 64e30ef..4cfbf6d 100644
--- a/src/java/org/apache/cassandra/net/FrameDecoder.java
+++ b/src/java/org/apache/cassandra/net/FrameDecoder.java
@@ -191,6 +191,14 @@ public abstract class FrameDecoder extends
ChannelInboundHandlerAdapter
abstract void addLastTo(ChannelPipeline pipeline);
/**
+ * @return true if we are actively decoding and processing frames
+ */
+ public boolean isActive()
+ {
+ return isActive;
+ }
+
+ /**
* For use by InboundMessageHandler (or other upstream handlers) that want
to start receiving frames.
*/
public void activate(FrameProcessor processor)
@@ -208,7 +216,7 @@ public abstract class FrameDecoder extends
ChannelInboundHandlerAdapter
* For use by InboundMessageHandler (or other upstream handlers) that want
to resume
* receiving frames after previously indicating that processing should be
paused.
*/
- void reactivate() throws IOException
+ public void reactivate() throws IOException
{
if (isActive)
throw new IllegalStateException("Tried to reactivate an already
active FrameDecoder");
@@ -282,7 +290,8 @@ public abstract class FrameDecoder extends
ChannelInboundHandlerAdapter
{
decode(frames, bytes);
- if (isActive) isActive = deliver(processor);
+ if (isActive)
+ isActive = deliver(processor);
}
@Override
diff --git a/src/java/org/apache/cassandra/net/InboundMessageHandler.java
b/src/java/org/apache/cassandra/net/InboundMessageHandler.java
index 64d0a8c..f29b3ec 100644
--- a/src/java/org/apache/cassandra/net/InboundMessageHandler.java
+++ b/src/java/org/apache/cassandra/net/InboundMessageHandler.java
@@ -34,8 +34,6 @@ import
org.apache.cassandra.exceptions.IncompatibleSchemaException;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.Message.Header;
-import org.apache.cassandra.net.FrameDecoder.Frame;
-import org.apache.cassandra.net.FrameDecoder.FrameProcessor;
import org.apache.cassandra.net.FrameDecoder.IntactFrame;
import org.apache.cassandra.net.FrameDecoder.CorruptFrame;
import org.apache.cassandra.net.ResourceLimits.Limit;
diff --git a/src/java/org/apache/cassandra/net/ResourceLimits.java
b/src/java/org/apache/cassandra/net/ResourceLimits.java
index 8899040..cfddfc3 100644
--- a/src/java/org/apache/cassandra/net/ResourceLimits.java
+++ b/src/java/org/apache/cassandra/net/ResourceLimits.java
@@ -41,7 +41,7 @@ public abstract class ResourceLimits
* Sets the total amount of permits represented by this {@link Limit}
- the capacity
*
* If the old limit has been reached and the new limit is large enough
to allow for more
- * permits to be aqcuired, subsequent calls to {@link #allocate(long)}
or {@link #tryAllocate(long)}
+ * permits to be acquired, subsequent calls to {@link #allocate(long)}
or {@link #tryAllocate(long)}
* will succeed.
*
* If the new limit is lower than the current amount of allocated
permits then subsequent calls
@@ -163,7 +163,7 @@ public abstract class ResourceLimits
// possible it would require synchronizing the closing of all
outbound connections
// and reinitializing the Concurrent limit before reopening.
For such an unlikely path
// (previously this was an assert), it is safer to terminate
the JVM and have something external
- // restart and get back to a known good state rather than
intermittendly crashing on any of
+ // restart and get back to a known good state rather than
intermittently crashing on any of
// the connections sharing this limit.
throw new UnrecoverableIllegalStateException(
"Internode messaging byte limits that are shared between
connections is invalid (using="+using+")");
diff --git a/src/java/org/apache/cassandra/service/StorageService.java
b/src/java/org/apache/cassandra/service/StorageService.java
index 396098b..66e1713 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -5875,6 +5875,30 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
ClientResourceLimits.setEndpointLimit(newLimit);
}
+ @Override
+ public int getNativeTransportMaxRequestsPerSecond()
+ {
+ return ClientResourceLimits.getNativeTransportMaxRequestsPerSecond();
+ }
+
+ @Override
+ public void setNativeTransportMaxRequestsPerSecond(int newPerSecond)
+ {
+
ClientResourceLimits.setNativeTransportMaxRequestsPerSecond(newPerSecond);
+ }
+
+ @Override
+ public void setNativeTransportRateLimitingEnabled(boolean enabled)
+ {
+ DatabaseDescriptor.setNativeTransportRateLimitingEnabled(enabled);
+ }
+
+ @Override
+ public boolean getNativeTransportRateLimitingEnabled()
+ {
+ return DatabaseDescriptor.getNativeTransportRateLimitingEnabled();
+ }
+
@VisibleForTesting
public void shutdownServer()
{
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 4622d06..e53eaf9 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -35,7 +35,6 @@ import javax.management.openmbean.TabularData;
import org.apache.cassandra.db.ColumnFamilyStoreMBean;
import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.locator.InetAddressAndPort;
public interface StorageServiceMBean extends NotificationEmitter
{
@@ -556,7 +555,10 @@ public interface StorageServiceMBean extends
NotificationEmitter
public void setNativeTransportMaxConcurrentRequestsInBytes(long newLimit);
public long getNativeTransportMaxConcurrentRequestsInBytesPerIp();
public void setNativeTransportMaxConcurrentRequestsInBytesPerIp(long
newLimit);
-
+ public int getNativeTransportMaxRequestsPerSecond();
+ public void setNativeTransportMaxRequestsPerSecond(int newPerSecond);
+ public void setNativeTransportRateLimitingEnabled(boolean enabled);
+ public boolean getNativeTransportRateLimitingEnabled();
// allows a node that have been started without joining the ring to join it
public void joinRing() throws IOException;
diff --git a/src/java/org/apache/cassandra/transport/CQLMessageHandler.java
b/src/java/org/apache/cassandra/transport/CQLMessageHandler.java
index a9dba8b..bbb8cb5 100644
--- a/src/java/org/apache/cassandra/transport/CQLMessageHandler.java
+++ b/src/java/org/apache/cassandra/transport/CQLMessageHandler.java
@@ -23,6 +23,8 @@ import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
import com.google.common.primitives.Ints;
+import org.apache.cassandra.transport.ClientResourceLimits.Overload;
+import org.apache.cassandra.utils.concurrent.NonBlockingRateLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,7 +44,6 @@ import org.apache.cassandra.net.ResourceLimits.Limit;
import org.apache.cassandra.net.ShareableBytes;
import org.apache.cassandra.transport.Flusher.FlushItem.Framed;
import org.apache.cassandra.transport.messages.ErrorMessage;
-import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.NoSpamLogger;
import static org.apache.cassandra.utils.MonotonicClock.approxTime;
@@ -78,6 +79,7 @@ public class CQLMessageHandler<M extends Message> extends
AbstractMessageHandler
private static final NoSpamLogger noSpamLogger =
NoSpamLogger.getLogger(logger, 1L, TimeUnit.SECONDS);
public static final int LARGE_MESSAGE_THRESHOLD =
FrameEncoder.Payload.MAX_SIZE - 1;
+ public static final TimeUnit RATE_LIMITER_DELAY_UNIT =
TimeUnit.NANOSECONDS;
private final Envelope.Decoder envelopeDecoder;
private final Message.Decoder<M> messageDecoder;
@@ -86,13 +88,14 @@ public class CQLMessageHandler<M extends Message> extends
AbstractMessageHandler
private final ErrorHandler errorHandler;
private final boolean throwOnOverload;
private final ProtocolVersion version;
+ private final NonBlockingRateLimiter requestRateLimiter;
long channelPayloadBytesInFlight;
private int consecutiveMessageErrors = 0;
interface MessageConsumer<M extends Message>
{
- void accept(Channel channel, M message, Dispatcher.FlushItemConverter
toFlushItem);
+ void accept(Channel channel, M message, Dispatcher.FlushItemConverter
toFlushItem, Overload backpressure);
}
interface ErrorHandler
@@ -129,6 +132,7 @@ public class CQLMessageHandler<M extends Message> extends
AbstractMessageHandler
this.errorHandler = errorHandler;
this.throwOnOverload = throwOnOverload;
this.version = version;
+ this.requestRateLimiter = resources.requestRateLimiter();
}
@Override
@@ -139,6 +143,23 @@ public class CQLMessageHandler<M extends Message> extends
AbstractMessageHandler
return super.process(frame);
}
+ /**
+ * Checks limits on bytes in flight and the request rate limiter (if
enabled), then takes one of three actions:
+ *
+ * 1.) If no limits are breached, process the request.
+ * 2.) If a limit is breached, and the connection is configured to throw
on overload, throw {@link OverloadedException}.
+ * 3.) If a limit is breached, and the connection is not configurd to
throw, process the request, and return false
+ * to let the {@link FrameDecoder} know it should stop processing
frames.
+ *
+ * If the connection is configured to throw {@link OverloadedException},
requests that breach the rate limit are
+ * not counted against that limit.
+ *
+ * @return true if the {@link FrameDecoder} should continue to process
incoming frames, and false if it should stop
+ * processing them, effectively applying backpressure to clients
+ *
+ * @throws ErrorMessage.WrappedException with an {@link
OverloadedException} if overload occurs and the
+ * connection is configured to throw on overload
+ */
protected boolean processOneContainedMessage(ShareableBytes bytes, Limit
endpointReserve, Limit globalReserve)
{
ByteBuffer buf = bytes.get();
@@ -157,41 +178,108 @@ public class CQLMessageHandler<M extends Message>
extends AbstractMessageHandler
// max CQL message size defaults to 256mb, so should be safe to
downcast
int messageSize = Ints.checkedCast(header.bodySizeInBytes);
+
if (throwOnOverload)
{
if (!acquireCapacity(header, endpointReserve, globalReserve))
{
- // discard the request and throw an exception
- ClientMetrics.instance.markRequestDiscarded();
- logger.trace("Discarded request of size: {}.
InflightChannelRequestPayload: {}, " +
- "InflightEndpointRequestPayload: {},
InflightOverallRequestPayload: {}, Header: {}",
- messageSize,
- channelPayloadBytesInFlight,
- endpointReserve.using(),
- globalReserve.using(),
- header);
-
- handleError(new OverloadedException("Server is in overloaded
state. " +
- "Cannot accept more
requests at this point"), header);
-
- // Don't stop processing incoming messages, rely on the client
to apply
- // backpressure when it receives OverloadedException
- // but discard this message as we're responding with the
overloaded error
- incrementReceivedMessageMetrics(messageSize);
- buf.position(buf.position() + Envelope.Header.LENGTH +
messageSize);
+ discardAndThrow(endpointReserve, globalReserve, buf, header,
messageSize, Overload.BYTES_IN_FLIGHT);
+ return true;
+ }
+
+ if (DatabaseDescriptor.getNativeTransportRateLimitingEnabled() &&
!requestRateLimiter.tryReserve())
+ {
+ // We've already allocated against the bytes-in-flight limits,
so release those resources.
+ release(header);
+ discardAndThrow(endpointReserve, globalReserve, buf, header,
messageSize, Overload.REQUESTS);
return true;
}
}
- else if (!acquireCapacityAndQueueOnFailure(header, endpointReserve,
globalReserve))
+ else
{
- // set backpressure on the channel, queuing the request until we
have capacity
- ClientMetrics.instance.pauseConnection();
- return false;
+ Overload backpressure = Overload.NONE;
+
+ if (!acquireCapacityAndQueueOnFailure(header, endpointReserve,
globalReserve))
+ {
+ if (processRequestAndUpdateMetrics(bytes, header, messageSize,
Overload.BYTES_IN_FLIGHT))
+ {
+ if (decoder.isActive())
+ ClientMetrics.instance.pauseConnection();
+ }
+
+ backpressure = Overload.BYTES_IN_FLIGHT;
+ }
+
+ if (DatabaseDescriptor.getNativeTransportRateLimitingEnabled())
+ {
+ // Reserve a permit even if we've already triggered
backpressure on bytes in flight.
+ long delay =
requestRateLimiter.reserveAndGetDelay(RATE_LIMITER_DELAY_UNIT);
+
+ if (backpressure == Overload.NONE && delay > 0)
+ {
+ if (processRequestAndUpdateMetrics(bytes, header,
messageSize, Overload.REQUESTS))
+ {
+ if (decoder.isActive())
+ ClientMetrics.instance.pauseConnection();
+
+ // Schedule a wakup here if we process successfully.
The connection should be closing otherwise.
+ scheduleConnectionWakeupTask(delay,
RATE_LIMITER_DELAY_UNIT);
+ }
+
+ backpressure = Overload.REQUESTS;
+ }
+ }
+
+ // If we triggered backpressure, make sure the caller stops
processing frames after the request completes.
+ if (backpressure != Overload.NONE)
+ return false;
}
+ return processRequestAndUpdateMetrics(bytes, header, messageSize,
Overload.NONE);
+ }
+
+ private boolean processRequestAndUpdateMetrics(ShareableBytes bytes,
Envelope.Header header, int messageSize, Overload backpressure)
+ {
channelPayloadBytesInFlight += messageSize;
incrementReceivedMessageMetrics(messageSize);
- return processRequest(composeRequest(header, bytes));
+ return processRequest(composeRequest(header, bytes), backpressure);
+ }
+
+ private void discardAndThrow(Limit endpointReserve, Limit globalReserve,
+ ByteBuffer buf, Envelope.Header header, int
messageSize,
+ Overload overload)
+ {
+ ClientMetrics.instance.markRequestDiscarded();
+ logOverload(endpointReserve, globalReserve, header, messageSize);
+
+ OverloadedException exception =
buildOverloadedException(endpointReserve, globalReserve, overload);
+ handleError(exception, header);
+
+ // Don't stop processing incoming messages, as we rely on the client
to apply
+ // backpressure when it receives OverloadedException, but discard this
message
+ // as we're responding with the overloaded error.
+ incrementReceivedMessageMetrics(messageSize);
+ buf.position(buf.position() + Envelope.Header.LENGTH + messageSize);
+ }
+
+ private OverloadedException buildOverloadedException(Limit
endpointReserve, Limit globalReserve, Overload overload) {
+ return overload == Overload.REQUESTS
+ ? new OverloadedException(String.format("Request breached
global limit of %d requests/second. Server is " +
+ "currently in an
overloaded state and cannot accept more requests.",
+
requestRateLimiter.getRate()))
+ : new OverloadedException(String.format("Request breached
limit on bytes in flight. (Endpoint: %d/%d bytes, Global: %d/%d bytes.) " +
+ "Server is currently
in an overloaded state and cannot accept more requests.",
+
endpointReserve.using(), endpointReserve.limit(), globalReserve.using(),
globalReserve.limit()));
+ }
+
+ private void logOverload(Limit endpointReserve, Limit globalReserve,
Envelope.Header header, int messageSize)
+ {
+ logger.trace("Discarded request of size {} with {} bytes in flight on
channel. " +
+ "Using {}/{} bytes of endpoint limit and {}/{} bytes of
global limit. " +
+ "Global rate limiter: {} Header: {}",
+ messageSize, channelPayloadBytesInFlight,
+ endpointReserve.using(), endpointReserve.limit(),
globalReserve.using(), globalReserve.limit(),
+ requestRateLimiter, header);
}
private boolean handleProtocolException(ProtocolException exception,
@@ -251,11 +339,17 @@ public class CQLMessageHandler<M extends Message> extends
AbstractMessageHandler
protected boolean processRequest(Envelope request)
{
+ return processRequest(request, Overload.NONE);
+ }
+
+ protected boolean processRequest(Envelope request, Overload backpressure)
+ {
M message = null;
try
{
message = messageDecoder.decode(channel, request);
- dispatcher.accept(channel, message, this::toFlushItem);
+ dispatcher.accept(channel, message, this::toFlushItem,
backpressure);
+
// sucessfully delivered a CQL message to the execution
// stage, so reset the counter of consecutive errors
consecutiveMessageErrors = 0;
@@ -293,8 +387,6 @@ public class CQLMessageHandler<M extends Message> extends
AbstractMessageHandler
* to the client.
* This also releases the capacity acquired for processing as
* indicated by supplied header.
- * @param t
- * @param header
*/
private void handleErrorAndRelease(Throwable t, Envelope.Header header)
{
@@ -311,8 +403,6 @@ public class CQLMessageHandler<M extends Message> extends
AbstractMessageHandler
* when an error occurs without any capacity being acquired.
* Typically, this would be the result of an acquisition failure
* if the THROW_ON_OVERLOAD option has been specified by the client.
- * @param t
- * @param header
*/
private void handleError(Throwable t, Envelope.Header header)
{
@@ -328,8 +418,6 @@ public class CQLMessageHandler<M extends Message> extends
AbstractMessageHandler
* when an error occurs without any capacity being acquired.
* Typically, this would be the result of an acquisition failure
* if the THROW_ON_OVERLOAD option has been specified by the client.
- * @param t
- * @param streamId
*/
private void handleError(Throwable t, int streamId)
{
@@ -342,8 +430,6 @@ public class CQLMessageHandler<M extends Message> extends
AbstractMessageHandler
* payload fails. This does not attempt to release any resources, as these
errors
* should only occur before any capacity acquisition is attempted (e.g. on
receipt
* of a corrupt frame, or failure to extract a CQL message from the
envelope).
- *
- * @param t
*/
private void handleError(Throwable t)
{
@@ -405,8 +491,9 @@ public class CQLMessageHandler<M extends Message> extends
AbstractMessageHandler
// max CQL message size defaults to 256mb, so should be safe to
downcast
int messageSize = Ints.checkedCast(header.bodySizeInBytes);
receivedBytes += buf.remaining();
-
+
LargeMessage largeMessage = new LargeMessage(header);
+
if (!acquireCapacity(header, endpointReserve, globalReserve))
{
// In the case of large messages, never stop processing
incoming frames
@@ -424,21 +511,49 @@ public class CQLMessageHandler<M extends Message> extends
AbstractMessageHandler
// concurrently.
if (throwOnOverload)
{
- // discard the request and throw an exception
+ // Mark as overloaded so that discard the message after
consuming any subsequent frames.
ClientMetrics.instance.markRequestDiscarded();
- logger.trace("Discarded request of size: {}.
InflightChannelRequestPayload: {}, " +
- "InflightEndpointRequestPayload: {},
InflightOverallRequestPayload: {}, Header: {}",
- messageSize,
- channelPayloadBytesInFlight,
- endpointReserve.using(),
- globalReserve.using(),
- header);
-
- // mark as overloaded so that discard the message
- // after consuming any subsequent frames
- largeMessage.markOverloaded();
+ logOverload(endpointReserve, globalReserve, header,
messageSize);
+ largeMessage.markOverloaded(Overload.BYTES_IN_FLIGHT);
}
}
+ else if
(DatabaseDescriptor.getNativeTransportRateLimitingEnabled())
+ {
+ if (throwOnOverload)
+ {
+ if (!requestRateLimiter.tryReserve())
+ {
+ ClientMetrics.instance.markRequestDiscarded();
+ logOverload(endpointReserve, globalReserve, header,
messageSize);
+
+ // Mark as overloaded so that we discard the message
after consuming any subsequent frames.
+ // (i.e. Request resources we may already have
acquired above will be released.)
+ largeMessage.markOverloaded(Overload.REQUESTS);
+
+ this.largeMessage = largeMessage;
+ largeMessage.supply(frame);
+ return true;
+ }
+ }
+ else
+ {
+ long delay =
requestRateLimiter.reserveAndGetDelay(RATE_LIMITER_DELAY_UNIT);
+
+ if (delay > 0)
+ {
+ this.largeMessage = largeMessage;
+ largeMessage.markBackpressure(Overload.REQUESTS);
+ largeMessage.supply(frame);
+
+ if (decoder.isActive())
+ ClientMetrics.instance.pauseConnection();
+
+ scheduleConnectionWakeupTask(delay,
RATE_LIMITER_DELAY_UNIT);
+ return false;
+ }
+ }
+ }
+
this.largeMessage = largeMessage;
largeMessage.supply(frame);
return true;
@@ -454,6 +569,31 @@ public class CQLMessageHandler<M extends Message> extends
AbstractMessageHandler
return channel.id().asShortText();
}
+ private void scheduleConnectionWakeupTask(long waitLength, TimeUnit unit)
+ {
+ channel.eventLoop().schedule(() ->
+ {
+ try
+ {
+ // We might have already
reactivated via another wake task.
+ if (!decoder.isActive())
+ {
+ decoder.reactivate();
+
+ // Only update the relevant
metric if we've actually activated.
+ if (decoder.isActive())
+
ClientMetrics.instance.unpauseConnection();
+ }
+ }
+ catch (Throwable t)
+ {
+ fatalExceptionCaught(t);
+ }
+ },
+ waitLength,
+ unit);
+ }
+
@SuppressWarnings("BooleanMethodIsAlwaysInverted")
private boolean acquireCapacityAndQueueOnFailure(Envelope.Header header,
Limit endpointReserve, Limit globalReserve)
{
@@ -515,7 +655,8 @@ public class CQLMessageHandler<M extends Message> extends
AbstractMessageHandler
{
private static final long EXPIRES_AT = Long.MAX_VALUE;
- private boolean overloaded = false;
+ private Overload overload = Overload.NONE;
+ private Overload backpressure = Overload.NONE;
private LargeMessage(Envelope.Header header)
{
@@ -541,19 +682,22 @@ public class CQLMessageHandler<M extends Message> extends
AbstractMessageHandler
* so we must ensure that subsequent frames are consumed from the
channel. At that
* point an error response is returned to the client, rather than
processing the message.
*/
- private void markOverloaded()
+ private void markOverloaded(Overload overload)
+ {
+ this.overload = overload;
+ }
+
+ private void markBackpressure(Overload backpressure)
{
- overloaded = true;
+ this.backpressure = backpressure;
}
protected void onComplete()
{
- if (overloaded)
- handleErrorAndRelease(new OverloadedException("Server is in
overloaded state. " +
- "Cannot accept
more requests at this point"), header);
+ if (overload != Overload.NONE)
+
handleErrorAndRelease(buildOverloadedException(endpointReserveCapacity,
globalReserveCapacity, overload), header);
else if (!isCorrupt)
- processRequest(assembleFrame());
-
+ processRequest(assembleFrame(), backpressure);
}
protected void abort()
diff --git a/src/java/org/apache/cassandra/transport/ClientResourceLimits.java
b/src/java/org/apache/cassandra/transport/ClientResourceLimits.java
index 17a6e59..f9ed692 100644
--- a/src/java/org/apache/cassandra/transport/ClientResourceLimits.java
+++ b/src/java/org/apache/cassandra/transport/ClientResourceLimits.java
@@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.cassandra.utils.concurrent.NonBlockingRateLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,6 +43,10 @@ public class ClientResourceLimits
private static final AbstractMessageHandler.WaitQueue GLOBAL_QUEUE =
AbstractMessageHandler.WaitQueue.global(GLOBAL_LIMIT);
private static final ConcurrentMap<InetAddress, Allocator>
PER_ENDPOINT_ALLOCATORS = new ConcurrentHashMap<>();
+ public static final NonBlockingRateLimiter GLOBAL_REQUEST_LIMITER = new
NonBlockingRateLimiter(getNativeTransportMaxRequestsPerSecond());
+
+ public enum Overload { NONE, REQUESTS, BYTES_IN_FLIGHT }
+
public static Allocator getAllocatorForEndpoint(InetAddress endpoint)
{
while (true)
@@ -95,6 +100,19 @@ public class ClientResourceLimits
return histogram.getSnapshot();
}
+ public static int getNativeTransportMaxRequestsPerSecond()
+ {
+ return DatabaseDescriptor.getNativeTransportMaxRequestsPerSecond();
+ }
+
+ public static void setNativeTransportMaxRequestsPerSecond(int newPerSecond)
+ {
+ int existingPerSecond = getNativeTransportMaxRequestsPerSecond();
+
DatabaseDescriptor.setNativeTransportMaxRequestsPerSecond(newPerSecond);
+ GLOBAL_REQUEST_LIMITER.setRate(newPerSecond);
+ logger.info("Changed native_transport_max_requests_per_second from {}
to {}", existingPerSecond, newPerSecond);
+ }
+
/**
* This will recompute the ip usage histo on each query of the snapshot
when requested instead of trying to keep
* a histogram up to date with each request
@@ -166,8 +184,8 @@ public class ClientResourceLimits
*
* @param amount number permits to allocate
* @return outcome SUCCESS if the allocation was successful. In the
case of failure,
- * either INSUFFICIENT_GLOBAL or INSUFFICIENT_ENPOINT to indicate
which reserve rejected
- * the allocation request.
+ * either INSUFFICIENT_GLOBAL or INSUFFICIENT_ENDPOINT to indicate
which
+ * reserve rejected the allocation request.
*/
ResourceLimits.Outcome tryAllocate(long amount)
{
@@ -211,19 +229,18 @@ public class ClientResourceLimits
return endpointAndGlobal.global().using();
}
+ @Override
public String toString()
{
- return String.format("InflightEndpointRequestPayload: %d/%d,
InflightOverallRequestPayload: %d/%d",
- endpointAndGlobal.endpoint().using(),
- endpointAndGlobal.endpoint().limit(),
- endpointAndGlobal.global().using(),
- endpointAndGlobal.global().limit());
+ return String.format("Using %d/%d bytes of endpoint limit and
%d/%d bytes of global limit.",
+ endpointAndGlobal.endpoint().using(),
endpointAndGlobal.endpoint().limit(),
+ endpointAndGlobal.global().using(),
endpointAndGlobal.global().limit());
}
}
/**
* Used in protocol V5 and later by the
AbstractMessageHandler/CQLMessageHandler hierarchy.
- * This hides the allocate/tryAllocate/release methods from
EndpointResourceLimits and exposes
+ * This hides the allocate/tryAllocate/release methods from {@link
ClientResourceLimits} and exposes
* the endpoint and global limits, along with their corresponding
* {@link org.apache.cassandra.net.AbstractMessageHandler.WaitQueue}
directly.
* Provided as an interface and single implementation for testing (see
CQLConnectionTest)
@@ -234,9 +251,10 @@ public class ClientResourceLimits
AbstractMessageHandler.WaitQueue globalWaitQueue();
ResourceLimits.Limit endpointLimit();
AbstractMessageHandler.WaitQueue endpointWaitQueue();
+ NonBlockingRateLimiter requestRateLimiter();
void release();
- static class Default implements ResourceProvider
+ class Default implements ResourceProvider
{
private final Allocator limits;
@@ -265,6 +283,11 @@ public class ClientResourceLimits
return limits.waitQueue;
}
+ public NonBlockingRateLimiter requestRateLimiter()
+ {
+ return GLOBAL_REQUEST_LIMITER;
+ }
+
public void release()
{
limits.release();
diff --git a/src/java/org/apache/cassandra/transport/Dispatcher.java
b/src/java/org/apache/cassandra/transport/Dispatcher.java
index 05b55e8..9a9cdce 100644
--- a/src/java/org/apache/cassandra/transport/Dispatcher.java
+++ b/src/java/org/apache/cassandra/transport/Dispatcher.java
@@ -20,8 +20,12 @@ package org.apache.cassandra.transport;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import io.netty.util.AttributeKey;
@@ -30,15 +34,19 @@ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.net.FrameEncoder;
import org.apache.cassandra.service.ClientWarn;
import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.transport.ClientResourceLimits.Overload;
import org.apache.cassandra.transport.Flusher.FlushItem;
import org.apache.cassandra.transport.messages.ErrorMessage;
import org.apache.cassandra.transport.messages.EventMessage;
import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.NoSpamLogger;
import static org.apache.cassandra.concurrent.SharedExecutorPool.SHARED;
public class Dispatcher
{
+ private static final Logger logger =
LoggerFactory.getLogger(Dispatcher.class);
+
private static final LocalAwareExecutorService requestExecutor =
SHARED.newExecutor(DatabaseDescriptor.getNativeTransportMaxThreads(),
DatabaseDescriptor::setNativeTransportMaxThreads,
"transport",
@@ -65,20 +73,37 @@ public class Dispatcher
this.useLegacyFlusher = useLegacyFlusher;
}
- public void dispatch(Channel channel, Message.Request request,
FlushItemConverter forFlusher)
+ public void dispatch(Channel channel, Message.Request request,
FlushItemConverter forFlusher, Overload backpressure)
{
- requestExecutor.submit(() -> processRequest(channel, request,
forFlusher));
+ requestExecutor.submit(() -> processRequest(channel, request,
forFlusher, backpressure));
}
/**
* Note: this method may be executed on the netty event loop, during
initial protocol negotiation
*/
- static Message.Response processRequest(ServerConnection connection,
Message.Request request)
+ static Message.Response processRequest(ServerConnection connection,
Message.Request request, Overload backpressure)
{
long queryStartNanoTime = System.nanoTime();
if (connection.getVersion().isGreaterOrEqualTo(ProtocolVersion.V4))
ClientWarn.instance.captureWarnings();
+ if (backpressure == Overload.REQUESTS)
+ {
+ String message = String.format("Request breached global limit of
%d requests/second and triggered backpressure.",
+
ClientResourceLimits.getNativeTransportMaxRequestsPerSecond());
+
+ NoSpamLogger.log(logger, NoSpamLogger.Level.INFO, 1,
TimeUnit.MINUTES, message);
+ ClientWarn.instance.warn(message);
+ }
+ else if (backpressure == Overload.BYTES_IN_FLIGHT)
+ {
+ String message = String.format("Request breached limit(s) on bytes
in flight (Endpoint: %d, Global: %d) and triggered backpressure.",
+
ClientResourceLimits.getEndpointLimit(), ClientResourceLimits.getGlobalLimit());
+
+ NoSpamLogger.log(logger, NoSpamLogger.Level.INFO, 1,
TimeUnit.MINUTES, message);
+ ClientWarn.instance.warn(message);
+ }
+
QueryState qstate = connection.validateNewMessage(request.type,
connection.getVersion());
Message.logger.trace("Received: {}, v={}", request,
connection.getVersion());
@@ -94,7 +119,7 @@ public class Dispatcher
/**
* Note: this method is not expected to execute on the netty event loop.
*/
- void processRequest(Channel channel, Message.Request request,
FlushItemConverter forFlusher)
+ void processRequest(Channel channel, Message.Request request,
FlushItemConverter forFlusher, Overload backpressure)
{
final Message.Response response;
final ServerConnection connection;
@@ -103,7 +128,7 @@ public class Dispatcher
{
assert request.connection() instanceof ServerConnection;
connection = (ServerConnection) request.connection();
- response = processRequest(connection, request);
+ response = processRequest(connection, request, backpressure);
toFlush = forFlusher.toFlushItem(channel, request, response);
Message.logger.trace("Responding: {}, v={}", response,
connection.getVersion());
}
@@ -152,7 +177,7 @@ public class Dispatcher
* for delivering events to registered clients is dependent on protocol
version and the configuration
* of the pipeline. For v5 and newer connections, the event message is
encoded into an Envelope,
* wrapped in a FlushItem and then delivered via the pipeline's flusher,
in a similar way to
- * a Response returned from {@link #processRequest(Channel,
Message.Request, FlushItemConverter)}.
+ * a Response returned from {@link #processRequest(Channel,
Message.Request, FlushItemConverter, Overload)}.
* It's worth noting that events are not generally fired as a direct
response to a client request,
* so this flush item has a null request attribute. The dispatcher itself
is created when the
* pipeline is first configured during protocol negotiation and is
attached to the channel for
diff --git a/src/java/org/apache/cassandra/transport/ExceptionHandlers.java
b/src/java/org/apache/cassandra/transport/ExceptionHandlers.java
index 8ad30ce..63951e9 100644
--- a/src/java/org/apache/cassandra/transport/ExceptionHandlers.java
+++ b/src/java/org/apache/cassandra/transport/ExceptionHandlers.java
@@ -32,6 +32,7 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
+import org.apache.cassandra.exceptions.OverloadedException;
import org.apache.cassandra.metrics.ClientMetrics;
import org.apache.cassandra.net.FrameEncoder;
import org.apache.cassandra.transport.messages.ErrorMessage;
@@ -87,22 +88,8 @@ public class ExceptionHandlers
JVMStabilityInspector.inspectThrowable(cause);
}
}
- if (Throwables.anyCauseMatches(cause, t -> t instanceof
ProtocolException))
- {
- // if any ProtocolExceptions is not silent, then handle
- if (Throwables.anyCauseMatches(cause, t -> t instanceof
ProtocolException && !((ProtocolException) t).isSilent()))
- {
- ClientMetrics.instance.markProtocolException();
- // since protocol exceptions are expected to be client
issues, not logging stack trace
- // to avoid spamming the logs once a bad client shows up
- NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1,
TimeUnit.MINUTES, "Protocol exception with client networking: " +
cause.getMessage());
- }
- }
- else
- {
- ClientMetrics.instance.markUnknownException();
- logger.warn("Unknown exception in client networking", cause);
- }
+
+ logClientNetworkingExceptions(cause);
}
private static boolean isFatal(Throwable cause)
@@ -112,6 +99,31 @@ public class ExceptionHandlers
}
}
+ static void logClientNetworkingExceptions(Throwable cause)
+ {
+ if (Throwables.anyCauseMatches(cause, t -> t instanceof
ProtocolException))
+ {
+ // if any ProtocolExceptions is not silent, then handle
+ if (Throwables.anyCauseMatches(cause, t -> t instanceof
ProtocolException && !((ProtocolException) t).isSilent()))
+ {
+ ClientMetrics.instance.markProtocolException();
+ // since protocol exceptions are expected to be client issues,
not logging stack trace
+ // to avoid spamming the logs once a bad client shows up
+ NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1,
TimeUnit.MINUTES, "Protocol exception with client networking: " +
cause.getMessage());
+ }
+ }
+ else if (Throwables.anyCauseMatches(cause, t -> t instanceof
OverloadedException))
+ {
+ // Once the threshold for overload is breached, it will very
likely spam the logs...
+ NoSpamLogger.log(logger, NoSpamLogger.Level.INFO, 1,
TimeUnit.MINUTES, cause.getMessage());
+ }
+ else
+ {
+ ClientMetrics.instance.markUnknownException();
+ logger.warn("Unknown exception in client networking", cause);
+ }
+ }
+
/**
* Include the channel info in the logged information for unexpected
errors, and (if {@link #alwaysLogAtError} is
* false then choose the log level based on the type of exception (some
are clearly client issues and shouldn't be
diff --git
a/src/java/org/apache/cassandra/transport/InitialConnectionHandler.java
b/src/java/org/apache/cassandra/transport/InitialConnectionHandler.java
index 77e9232..f3c050d 100644
--- a/src/java/org/apache/cassandra/transport/InitialConnectionHandler.java
+++ b/src/java/org/apache/cassandra/transport/InitialConnectionHandler.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.cassandra.transport.ClientResourceLimits.Overload;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -147,7 +148,7 @@ public class InitialConnectionHandler extends
ByteToMessageDecoder
promise = new VoidChannelPromise(ctx.channel(), false);
}
- final Message.Response response =
Dispatcher.processRequest((ServerConnection) connection, startup);
+ final Message.Response response =
Dispatcher.processRequest((ServerConnection) connection, startup,
Overload.NONE);
outbound = response.encode(inbound.header.version);
ctx.writeAndFlush(outbound, promise);
logger.debug("Configured pipeline: {}", ctx.pipeline());
diff --git a/src/java/org/apache/cassandra/transport/PreV5Handlers.java
b/src/java/org/apache/cassandra/transport/PreV5Handlers.java
index cec8edd..f45028d 100644
--- a/src/java/org/apache/cassandra/transport/PreV5Handlers.java
+++ b/src/java/org/apache/cassandra/transport/PreV5Handlers.java
@@ -19,8 +19,9 @@
package org.apache.cassandra.transport;
import java.util.List;
-import java.util.concurrent.TimeUnit;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.transport.ClientResourceLimits.Overload;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,10 +40,9 @@ import org.apache.cassandra.metrics.ClientMetrics;
import org.apache.cassandra.net.ResourceLimits;
import org.apache.cassandra.transport.messages.ErrorMessage;
import org.apache.cassandra.utils.JVMStabilityInspector;
-import org.apache.cassandra.utils.NoSpamLogger;
-import org.apache.cassandra.utils.Throwables;
-import static org.apache.cassandra.transport.Message.logger;
+import static
org.apache.cassandra.transport.CQLMessageHandler.RATE_LIMITER_DELAY_UNIT;
+import static
org.apache.cassandra.transport.ClientResourceLimits.GLOBAL_REQUEST_LIMITER;
public class PreV5Handlers
{
@@ -63,7 +63,9 @@ public class PreV5Handlers
* Note: should only be accessed while on the netty event loop.
*/
private long channelPayloadBytesInFlight;
- private boolean paused;
+
+ /** The cause of the current connection pause, or {@link
Overload#NONE} if it is unpaused. */
+ private Overload backpressure = Overload.NONE;
LegacyDispatchHandler(Dispatcher dispatcher,
ClientResourceLimits.Allocator endpointPayloadTracker)
{
@@ -71,11 +73,12 @@ public class PreV5Handlers
this.endpointPayloadTracker = endpointPayloadTracker;
}
- protected void channelRead0(ChannelHandlerContext ctx, Message.Request
request) throws Exception
+ protected void channelRead0(ChannelHandlerContext ctx, Message.Request
request)
{
- // if we decide to handle this message, process it outside of the
netty event loop
- if (shouldHandleRequest(ctx, request))
- dispatcher.dispatch(ctx.channel(), request, this::toFlushItem);
+ // The only reason we won't process this message is if
checkLimits() throws an OverloadedException.
+ // (i.e. Even if backpressure is applied, the current request is
allowed to finish.)
+ checkLimits(ctx, request);
+ dispatcher.dispatch(ctx.channel(), request, this::toFlushItem,
backpressure);
}
// Acts as a Dispatcher.FlushItemConverter
@@ -95,76 +98,143 @@ public class PreV5Handlers
// since the request has been processed, decrement inflight
payload at channel, endpoint and global levels
channelPayloadBytesInFlight -= itemSize;
- ResourceLimits.Outcome endpointGlobalReleaseOutcome =
endpointPayloadTracker.release(itemSize);
+ boolean globalInFlightBytesBelowLimit =
endpointPayloadTracker.release(itemSize) == ResourceLimits.Outcome.BELOW_LIMIT;
- // now check to see if we need to reenable the channel's autoRead.
- // If the current payload side is zero, we must reenable autoread
as
+ // Now check to see if we need to reenable the channel's autoRead.
+ //
+ // If the current payload bytes in flight is zero, we must
reenable autoread as
// 1) we allow no other thread/channel to do it, and
- // 2) there's no other events following this one (becuase we're at
zero bytes in flight),
- // so no successive to trigger the other clause in this if-block
+ // 2) there are no other events following this one (becuase we're
at zero bytes in flight),
+ // so no successive to trigger the other clause in this if-block.
+ //
+ // The only exception to this is if the global request rate limit
has been breached, which means
+ // we'll have to wait until a scheduled wakeup task unpauses the
connection.
//
- // note: this path is only relevant when part of a pre-V5
pipeline, as only in this case is
+ // Note: This path is only relevant when part of a pre-V5
pipeline, as only in this case is
// paused ever set to true. In pipelines configured for V5 or
later, backpressure and control
// over the inbound pipeline's autoread status are handled by the
FrameDecoder/FrameProcessor.
ChannelConfig config = item.channel.config();
- if (paused && (channelPayloadBytesInFlight == 0 ||
endpointGlobalReleaseOutcome == ResourceLimits.Outcome.BELOW_LIMIT))
+
+ if (backpressure == Overload.BYTES_IN_FLIGHT &&
(channelPayloadBytesInFlight == 0 || globalInFlightBytesBelowLimit))
{
- paused = false;
- ClientMetrics.instance.unpauseConnection();
- config.setAutoRead(true);
+ unpauseConnection(config);
}
}
/**
- * This check for inflight payload to potentially discard the request
should have been ideally in one of the
- * first handlers in the pipeline (Envelope.Decoder::decode()).
However, incase of any exception thrown between that
- * handler (where inflight payload is incremented) and this handler
(Dispatcher::channelRead0) (where inflight
- * payload in decremented), inflight payload becomes erroneous.
ExceptionHandler is not sufficient for this
- * purpose since it does not have the message envelope associated with
the exception.
+ * Checks limits on bytes in flight and the request rate limiter (if
enabled) to determine whether to drop a
+ * request or trigger backpressure and pause the connection.
+ * <p>
+ * The check for inflight payload to potentially discard the request
should have been ideally in one of the
+ * first handlers in the pipeline (Envelope.Decoder::decode()).
However, in case of any exception thrown between
+ * that handler (where inflight payload is incremented) and this
handler (Dispatcher::channelRead0) (where
+ * inflight payload in decremented), inflight payload becomes
erroneous. ExceptionHandler is not sufficient for
+ * this purpose since it does not have the message envelope associated
with the exception.
+ * <p>
+ * If the connection is configured to throw {@link
OverloadedException}, requests that breach the rate limit are
+ * not counted against that limit.
* <p>
* Note: this method should execute on the netty event loop.
+ *
+ * @throws ErrorMessage.WrappedException with an {@link
OverloadedException} if overload occurs and the
+ * connection is configured to throw on overload
*/
- private boolean shouldHandleRequest(ChannelHandlerContext ctx,
Message.Request request)
+ private void checkLimits(ChannelHandlerContext ctx, Message.Request
request)
{
long requestSize = request.getSource().header.bodySizeInBytes;
-
- // check for overloaded state by trying to allocate the message
size from inflight payload trackers
- if (endpointPayloadTracker.tryAllocate(requestSize) !=
ResourceLimits.Outcome.SUCCESS)
+
+ if (request.connection.isThrowOnOverload())
{
- if (request.connection.isThrowOnOverload())
+ if (endpointPayloadTracker.tryAllocate(requestSize) !=
ResourceLimits.Outcome.SUCCESS)
+ {
+ discardAndThrow(request, requestSize,
Overload.BYTES_IN_FLIGHT);
+ }
+
+ if (DatabaseDescriptor.getNativeTransportRateLimitingEnabled()
&& !GLOBAL_REQUEST_LIMITER.tryReserve())
{
- // discard the request and throw an exception
- ClientMetrics.instance.markRequestDiscarded();
- logger.trace("Discarded request of size: {}.
InflightChannelRequestPayload: {}, {}, Request: {}",
- requestSize,
- channelPayloadBytesInFlight,
- endpointPayloadTracker.toString(),
- request);
- throw ErrorMessage.wrap(new OverloadedException("Server is
in overloaded state. Cannot accept more requests at this point"),
-
request.getSource().header.streamId);
+ // We've already allocated against the payload tracker
here, so release those resources.
+ endpointPayloadTracker.release(requestSize);
+ discardAndThrow(request, requestSize, Overload.REQUESTS);
}
- else
+ }
+ else
+ {
+ // Any request that gets here will be processed, so increment
the channel bytes in flight.
+ channelPayloadBytesInFlight += requestSize;
+
+ // Check for overloaded state by trying to allocate the
message size from inflight payload trackers
+ if (endpointPayloadTracker.tryAllocate(requestSize) !=
ResourceLimits.Outcome.SUCCESS)
{
- // set backpressure on the channel, and handle the request
endpointPayloadTracker.allocate(requestSize);
- ctx.channel().config().setAutoRead(false);
- ClientMetrics.instance.pauseConnection();
- paused = true;
+ pauseConnection(ctx);
+ backpressure = Overload.BYTES_IN_FLIGHT;
+ }
+
+ if (DatabaseDescriptor.getNativeTransportRateLimitingEnabled())
+ {
+ // Reserve a permit even if we've already triggered
backpressure on bytes in flight.
+ long delay =
GLOBAL_REQUEST_LIMITER.reserveAndGetDelay(RATE_LIMITER_DELAY_UNIT);
+
+ // If we've already triggered backpressure on bytes in
flight, no further action is necessary.
+ if (backpressure == Overload.NONE && delay > 0)
+ {
+ pauseConnection(ctx);
+
+ // A permit isn't immediately available, so schedule
an unpause for when it is.
+ ctx.channel().eventLoop().schedule(() ->
unpauseConnection(ctx.channel().config()), delay, RATE_LIMITER_DELAY_UNIT);
+ backpressure = Overload.REQUESTS;
+ }
}
}
+ }
+
+ private void pauseConnection(ChannelHandlerContext ctx)
+ {
+ if (ctx.channel().config().isAutoRead())
+ {
+ ctx.channel().config().setAutoRead(false);
+ ClientMetrics.instance.pauseConnection();
+ }
+ }
- channelPayloadBytesInFlight += requestSize;
- return true;
+ private void unpauseConnection(ChannelConfig config)
+ {
+ backpressure = Overload.NONE;
+
+ if (!config.isAutoRead())
+ {
+ ClientMetrics.instance.unpauseConnection();
+ config.setAutoRead(true);
+ }
}
+ private void discardAndThrow(Message.Request request, long
requestSize, Overload overload)
+ {
+ ClientMetrics.instance.markRequestDiscarded();
+
+ logger.trace("Discarded request of size {} with {} bytes in flight
on channel. {} " +
+ "Global rate limiter: {} Request: {}",
+ requestSize, channelPayloadBytesInFlight,
endpointPayloadTracker,
+ GLOBAL_REQUEST_LIMITER, request);
+
+ OverloadedException exception = overload == Overload.REQUESTS
+ ? new OverloadedException(String.format("Request breached
global limit of %d requests/second. Server is " +
+ "currently in an
overloaded state and cannot accept more requests.",
+
GLOBAL_REQUEST_LIMITER.getRate()))
+ : new OverloadedException(String.format("Request breached
limit on bytes in flight. (%s)) " +
+ "Server is
currently in an overloaded state and cannot accept more requests.",
+
+ endpointPayloadTracker));
+
+ throw ErrorMessage.wrap(exception,
request.getSource().header.streamId);
+ }
@Override
public void channelInactive(ChannelHandlerContext ctx)
{
endpointPayloadTracker.release();
- if (paused)
+ if (!ctx.channel().config().isAutoRead())
{
- paused = false;
ClientMetrics.instance.unpauseConnection();
}
ctx.fireChannelInactive();
@@ -181,7 +251,7 @@ public class PreV5Handlers
public static final ProtocolDecoder instance = new ProtocolDecoder();
private ProtocolDecoder(){}
- public void decode(ChannelHandlerContext ctx, Envelope source, List
results)
+ public void decode(ChannelHandlerContext ctx, Envelope source,
List<Object> results)
{
try
{
@@ -212,7 +282,7 @@ public class PreV5Handlers
{
public static final ProtocolEncoder instance = new ProtocolEncoder();
private ProtocolEncoder(){}
- public void encode(ChannelHandlerContext ctx, Message source, List
results)
+ public void encode(ChannelHandlerContext ctx, Message source,
List<Object> results)
{
ProtocolVersion version = getConnectionVersion(ctx);
results.add(source.encode(version));
@@ -244,22 +314,8 @@ public class PreV5Handlers
if (isFatal(cause))
future.addListener((ChannelFutureListener) f ->
ctx.close());
}
- if (Throwables.anyCauseMatches(cause, t -> t instanceof
ProtocolException))
- {
- // if any ProtocolExceptions is not silent, then handle
- if (Throwables.anyCauseMatches(cause, t -> t instanceof
ProtocolException && !((ProtocolException) t).isSilent()))
- {
- ClientMetrics.instance.markProtocolException();
- // since protocol exceptions are expected to be client
issues, not logging stack trace
- // to avoid spamming the logs once a bad client shows up
- NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1,
TimeUnit.MINUTES, "Protocol exception with client networking: " +
cause.getMessage());
- }
- }
- else
- {
- ClientMetrics.instance.markUnknownException();
- logger.warn("Unknown exception in client networking", cause);
- }
+
+ ExceptionHandlers.logClientNetworkingExceptions(cause);
JVMStabilityInspector.inspectThrowable(cause);
}
diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java
b/src/java/org/apache/cassandra/transport/SimpleClient.java
index ae89e93..87e54a1 100644
--- a/src/java/org/apache/cassandra/transport/SimpleClient.java
+++ b/src/java/org/apache/cassandra/transport/SimpleClient.java
@@ -27,6 +27,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.primitives.Ints;
+import org.apache.cassandra.transport.ClientResourceLimits.Overload;
+import org.apache.cassandra.utils.concurrent.NonBlockingRateLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,9 +52,13 @@ import org.apache.cassandra.transport.messages.*;
import static org.apache.cassandra.transport.CQLMessageHandler.envelopeSize;
import static org.apache.cassandra.transport.Flusher.MAX_FRAMED_PAYLOAD_SIZE;
+import static
org.apache.cassandra.utils.concurrent.NonBlockingRateLimiter.NO_OP_LIMITER;
public class SimpleClient implements Closeable
{
+
+ public static final int TIMEOUT_SECONDS = 10;
+
static
{
InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory());
@@ -281,7 +287,7 @@ public class SimpleClient implements Closeable
{
request.attach(connection);
lastWriteFuture =
channel.writeAndFlush(Collections.singletonList(request));
- Message.Response msg = responseHandler.responses.poll(10,
TimeUnit.SECONDS);
+ Message.Response msg =
responseHandler.responses.poll(TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (msg == null)
throw new RuntimeException("timeout");
if (throwOnErrorResponse && msg instanceof ErrorMessage)
@@ -310,7 +316,7 @@ public class SimpleClient implements Closeable
}
lastWriteFuture = channel.writeAndFlush(requests);
- long deadline = System.currentTimeMillis() +
TimeUnit.SECONDS.toMillis(10);
+ long deadline = System.currentTimeMillis() +
TimeUnit.SECONDS.toMillis(TIMEOUT_SECONDS);
for (int i = 0; i < requests.size(); i++)
{
Message.Response msg =
responseHandler.responses.poll(deadline - System.currentTimeMillis(),
TimeUnit.MILLISECONDS);
@@ -393,7 +399,7 @@ public class SimpleClient implements Closeable
case AUTHENTICATE:
if
(response.header.version.isGreaterOrEqualTo(ProtocolVersion.V5))
{
- configureModernPipeline(ctx, response);
+ configureModernPipeline(ctx, response,
largeMessageThreshold);
// consuming the message is done when setting up the
pipeline
}
else
@@ -414,7 +420,7 @@ public class SimpleClient implements Closeable
}
}
- private void configureModernPipeline(ChannelHandlerContext ctx,
Envelope response)
+ private void configureModernPipeline(ChannelHandlerContext ctx,
Envelope response, int largeMessageThreshold)
{
logger.info("Configuring modern pipeline");
ChannelPipeline pipeline = ctx.pipeline();
@@ -434,7 +440,7 @@ public class SimpleClient implements Closeable
FrameEncoder frameEncoder = frameEncoder(ctx);
FrameEncoder.PayloadAllocator payloadAllocator =
frameEncoder.allocator();
- CQLMessageHandler.MessageConsumer<Message.Response>
responseConsumer = (c, message, converter) -> {
+ CQLMessageHandler.MessageConsumer<Message.Response>
responseConsumer = (c, message, converter, backpressured) -> {
responseHandler.handleResponse(c, message);
};
@@ -470,6 +476,12 @@ public class SimpleClient implements Closeable
return endpointQueue;
}
+ @Override
+ public NonBlockingRateLimiter requestRateLimiter()
+ {
+ return NO_OP_LIMITER;
+ }
+
public void release()
{
}
@@ -512,7 +524,7 @@ public class SimpleClient implements Closeable
Connection connection =
ctx.channel().attr(Connection.attributeKey).get();
// The only case the connection can be null is when we
send the initial STARTUP message (client side thus)
ProtocolVersion version = connection == null ?
ProtocolVersion.CURRENT : connection.getVersion();
- SimpleFlusher flusher = new SimpleFlusher(frameEncoder);
+ SimpleFlusher flusher = new SimpleFlusher(frameEncoder,
largeMessageThreshold);
for (Message message : (List<Message>) msg)
flusher.enqueue(message.encode(version));
@@ -522,7 +534,7 @@ public class SimpleClient implements Closeable
pipeline.remove(this);
Message.Response message = messageDecoder.decode(ctx.channel(),
response);
- responseConsumer.accept(channel, message, (ch, req, resp) -> null);
+ responseConsumer.accept(channel, message, (ch, req, resp) -> null,
Overload.NONE);
}
private FrameDecoder frameDecoder(ChannelHandlerContext ctx,
BufferPoolAllocator allocator)
@@ -669,10 +681,17 @@ public class SimpleClient implements Closeable
final Queue<Envelope> outbound = new ConcurrentLinkedQueue<>();
final FrameEncoder frameEncoder;
private final AtomicBoolean scheduled = new AtomicBoolean(false);
+ private final int largeMessageThreshold;
- SimpleFlusher(FrameEncoder frameEncoder)
+ SimpleFlusher(FrameEncoder frameEncoder, int largeMessageThreshold)
{
this.frameEncoder = frameEncoder;
+ this.largeMessageThreshold = largeMessageThreshold;
+ }
+
+ SimpleFlusher(FrameEncoder frameEncoder)
+ {
+ this(frameEncoder, MAX_FRAMED_PAYLOAD_SIZE);
}
public void enqueue(Envelope message)
@@ -709,14 +728,14 @@ public class SimpleClient implements Closeable
Envelope f;
while ((f = outbound.poll()) != null)
{
- if (f.header.bodySizeInBytes > MAX_FRAMED_PAYLOAD_SIZE)
+ if (f.header.bodySizeInBytes > largeMessageThreshold)
{
combiner.addAll(writeLargeMessage(ctx, f));
}
else
{
int messageSize = envelopeSize(f.header);
- if (bufferSize + messageSize >= MAX_FRAMED_PAYLOAD_SIZE)
+ if (bufferSize + messageSize >= largeMessageThreshold)
{
combiner.add(flushBuffer(ctx, buffer, bufferSize));
buffer = new ArrayList<>();
@@ -751,9 +770,9 @@ public class SimpleClient implements Closeable
private FrameEncoder.Payload allocate(int size, boolean selfContained)
{
FrameEncoder.Payload payload = frameEncoder.allocator()
-
.allocate(selfContained, Math.min(size, MAX_FRAMED_PAYLOAD_SIZE));
- if (size >= MAX_FRAMED_PAYLOAD_SIZE)
- payload.buffer.limit(MAX_FRAMED_PAYLOAD_SIZE);
+
.allocate(selfContained, Math.min(size, largeMessageThreshold));
+ if (size >= largeMessageThreshold)
+ payload.buffer.limit(largeMessageThreshold);
return payload;
}
@@ -766,14 +785,14 @@ public class SimpleClient implements Closeable
boolean firstFrame = true;
while (f.body.readableBytes() > 0 || firstFrame)
{
- int payloadSize = Math.min(f.body.readableBytes(),
MAX_FRAMED_PAYLOAD_SIZE);
+ int payloadSize = Math.min(f.body.readableBytes(),
largeMessageThreshold);
payload = allocate(f.body.readableBytes(), false);
buf = payload.buffer;
// BufferPool may give us a buffer larger than we asked for.
// FrameEncoder may object if buffer.remaining is >= MAX_SIZE.
- if (payloadSize >= MAX_FRAMED_PAYLOAD_SIZE)
- buf.limit(MAX_FRAMED_PAYLOAD_SIZE);
+ if (payloadSize >= largeMessageThreshold)
+ buf.limit(largeMessageThreshold);
if (firstFrame)
{
diff --git
a/src/java/org/apache/cassandra/utils/concurrent/NonBlockingRateLimiter.java
b/src/java/org/apache/cassandra/utils/concurrent/NonBlockingRateLimiter.java
new file mode 100644
index 0000000..2a98222
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/concurrent/NonBlockingRateLimiter.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils.concurrent;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Ticker;
+
+import javax.annotation.concurrent.ThreadSafe;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.lang.Math.toIntExact;
+
+/**
+ * A rate limiter implementation that allows callers to reserve permits that
may only be available
+ * in the future, delegating to them decisions about how to schedule/delay
work and whether or not
+ * to block execution to do so.
+ */
+@SuppressWarnings("UnstableApiUsage")
+@ThreadSafe
+public class NonBlockingRateLimiter
+{
+ public static final long NANOS_PER_SECOND = TimeUnit.SECONDS.toNanos(1);
+ static final long DEFAULT_BURST_NANOS = NANOS_PER_SECOND;
+
+ /** a starting time for elapsed time calculations */
+ private final long startedNanos;
+
+ /** nanoseconds from start time corresponding to the next available permit
*/
+ private final AtomicLong nextAvailable = new AtomicLong();
+
+ private volatile Ticker ticker;
+
+ private volatile int permitsPerSecond;
+
+ /** time in nanoseconds between permits on the timeline */
+ private volatile long intervalNanos;
+
+ /**
+ * To allow the limiter to more closely adhere to the configured rate in
the face of
+ * unevenly distributed permits requests, it will allow a number of
permits equal to
+ * burstNanos / intervalNanos to be issued in a "burst" before reaching a
steady state.
+ *
+ * Another way to think about this is that it allows us to bring forward
the permits
+ * from short periods of inactivity. This is especially useful when the
upstream user
+ * of the limiter delays request processing mechanism contains overhead
that is longer
+ * than the intervalNanos in duration.
+ */
+ private final long burstNanos;
+
+ public NonBlockingRateLimiter(int permitsPerSecond)
+ {
+ this(permitsPerSecond, DEFAULT_BURST_NANOS, Ticker.systemTicker());
+ }
+
+ @VisibleForTesting
+ public NonBlockingRateLimiter(int permitsPerSecond, long burstNanos,
Ticker ticker)
+ {
+ this.startedNanos = ticker.read();
+ this.burstNanos = burstNanos;
+ setRate(permitsPerSecond, ticker);
+ }
+
+ public void setRate(int permitsPerSecond)
+ {
+ setRate(permitsPerSecond, Ticker.systemTicker());
+ }
+
+ @VisibleForTesting
+ public synchronized void setRate(int permitsPerSecond, Ticker ticker)
+ {
+ Preconditions.checkArgument(permitsPerSecond > 0, "permits/second must
be positive");
+ Preconditions.checkArgument(permitsPerSecond <= NANOS_PER_SECOND,
"permits/second cannot be greater than " + NANOS_PER_SECOND);
+
+ this.ticker = ticker;
+ this.permitsPerSecond = permitsPerSecond;
+ intervalNanos = NANOS_PER_SECOND / permitsPerSecond;
+ nextAvailable.set(nanosElapsed());
+ }
+
+ /**
+ * @return the number of available permits per second
+ */
+ public int getRate()
+ {
+ return permitsPerSecond;
+ }
+
+ /**
+ * Reserves a single permit slot on the timeline which may not yet be
available.
+ *
+ * @return time until the reserved permit will be available (or zero if it
already is) in the specified units
+ */
+ public long reserveAndGetDelay(TimeUnit delayUnit)
+ {
+ long nowNanos = nanosElapsed();
+
+ for (;;)
+ {
+ long prev = nextAvailable.get();
+ long interval = this.intervalNanos;
+
+ // Push the first available permit slot up to the burst window if
necessary.
+ long firstAvailable = Math.max(prev, nowNanos - burstNanos);
+
+ // Advance the configured interval starting from the bounded
previous permit slot.
+ if (nextAvailable.compareAndSet(prev, firstAvailable + interval))
+ // If the time now is before the first available slot, return
the delay.
+ return delayUnit.convert(Math.max(0, firstAvailable -
nowNanos), TimeUnit.NANOSECONDS);
+ }
+ }
+
+ /**
+ * Reserves a single permit slot on the timeline, but only if one is
available.
+ *
+ * @return true if a permit is available, false if one is not
+ */
+ public boolean tryReserve()
+ {
+ long nowNanos = nanosElapsed();
+
+ for (;;)
+ {
+ long prev = nextAvailable.get();
+ long interval = this.intervalNanos;
+
+ // Push the first available permit slot up to the burst window if
necessary.
+ long firstAvailable = Math.max(prev, nowNanos - burstNanos);
+
+ // If we haven't reached the time for the first available permit,
we've failed to reserve.
+ if (nowNanos < firstAvailable)
+ return false;
+
+ // Advance the configured interval starting from the bounded
previous permit slot.
+ // If another thread has already taken the next slot, retry.
+ if (nextAvailable.compareAndSet(prev, firstAvailable + interval))
+ return true;
+ }
+ }
+
+ @VisibleForTesting
+ public long getIntervalNanos()
+ {
+ return intervalNanos;
+ }
+
+ @VisibleForTesting
+ public long getStartedNanos()
+ {
+ return startedNanos;
+ }
+
+ private long nanosElapsed()
+ {
+ return ticker.read() - startedNanos;
+ }
+
+ public static final NonBlockingRateLimiter NO_OP_LIMITER = new
NonBlockingRateLimiter(toIntExact(NANOS_PER_SECOND))
+ {
+ @Override
+ public long reserveAndGetDelay(TimeUnit delayUnit) {
+ return 0;
+ }
+
+ @Override
+ public boolean tryReserve()
+ {
+ return true;
+ }
+ };
+}
diff --git a/test/burn/org/apache/cassandra/transport/BurnTestUtil.java
b/test/burn/org/apache/cassandra/transport/BurnTestUtil.java
index 9610455..c8017d1 100644
--- a/test/burn/org/apache/cassandra/transport/BurnTestUtil.java
+++ b/test/burn/org/apache/cassandra/transport/BurnTestUtil.java
@@ -35,6 +35,9 @@ import org.apache.cassandra.net.AbstractMessageHandler;
import org.apache.cassandra.net.ResourceLimits;
import org.apache.cassandra.transport.messages.QueryMessage;
import org.apache.cassandra.transport.messages.ResultMessage;
+import org.apache.cassandra.utils.concurrent.NonBlockingRateLimiter;
+
+import static
org.apache.cassandra.utils.concurrent.NonBlockingRateLimiter.NO_OP_LIMITER;
public class BurnTestUtil
{
@@ -143,6 +146,12 @@ public class BurnTestUtil
return delegate.endpointWaitQueue();
}
+ @Override
+ public NonBlockingRateLimiter requestRateLimiter()
+ {
+ return NO_OP_LIMITER;
+ }
+
public void release()
{
delegate.release();
diff --git a/test/burn/org/apache/cassandra/transport/SimpleClientPerfTest.java
b/test/burn/org/apache/cassandra/transport/SimpleClientPerfTest.java
index 20b8cb3..0427ad9 100644
--- a/test/burn/org/apache/cassandra/transport/SimpleClientPerfTest.java
+++ b/test/burn/org/apache/cassandra/transport/SimpleClientPerfTest.java
@@ -23,12 +23,14 @@ import java.net.ServerSocket;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
-import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
+import com.google.common.util.concurrent.RateLimiter;
+import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -41,11 +43,13 @@ import org.apache.cassandra.auth.AllowAllAuthorizer;
import org.apache.cassandra.auth.AllowAllNetworkAuthorizer;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.EncryptionOptions;
+import org.apache.cassandra.exceptions.OverloadedException;
import org.apache.cassandra.metrics.ClientMetrics;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.transport.messages.QueryMessage;
import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.utils.AssertUtil;
+import org.apache.cassandra.utils.Throwables;
import static org.apache.cassandra.transport.BurnTestUtil.SizeCaps;
import static org.apache.cassandra.transport.BurnTestUtil.generateQueryMessage;
@@ -57,7 +61,7 @@ public class SimpleClientPerfTest
@Parameterized.Parameter
public ProtocolVersion version;
- @Parameterized.Parameters()
+ @Parameterized.Parameters(name="{0}")
public static Collection<Object[]> versions()
{
return ProtocolVersion.SUPPORTED.stream()
@@ -90,6 +94,7 @@ public class SimpleClientPerfTest
}
}
+ @SuppressWarnings({"IOResourceOpenedButNotSafelyClosed", "resource"})
@Test
public void measureSmall() throws Throwable
{
@@ -102,6 +107,7 @@ public class SimpleClientPerfTest
version);
}
+ @SuppressWarnings({"IOResourceOpenedButNotSafelyClosed", "resource"})
@Test
public void measureSmallWithCompression() throws Throwable
{
@@ -114,6 +120,7 @@ public class SimpleClientPerfTest
version);
}
+ @SuppressWarnings({"IOResourceOpenedButNotSafelyClosed", "resource"})
@Test
public void measureLarge() throws Throwable
{
@@ -126,6 +133,7 @@ public class SimpleClientPerfTest
version);
}
+ @SuppressWarnings({"IOResourceOpenedButNotSafelyClosed", "resource"})
@Test
public void measureLargeWithCompression() throws Throwable
{
@@ -138,6 +146,7 @@ public class SimpleClientPerfTest
version);
}
+ @SuppressWarnings({"UnstableApiUsage", "UseOfSystemOutOrSystemErr",
"ResultOfMethodCallIgnored"})
public void perfTest(SizeCaps requestCaps, SizeCaps responseCaps,
AssertUtil.ThrowingSupplier<SimpleClient> clientSupplier, ProtocolVersion
version) throws Throwable
{
ResultMessage.Rows response = generateRows(0, responseCaps);
@@ -190,7 +199,9 @@ public class SimpleClientPerfTest
AtomicBoolean measure = new AtomicBoolean(false);
DescriptiveStatistics stats = new DescriptiveStatistics();
Lock lock = new ReentrantLock();
-
+ RateLimiter limiter = RateLimiter.create(2000);
+ AtomicLong overloadedExceptions = new AtomicLong(0);
+
// TODO: exercise client -> server large messages
for (int t = 0; t < threads; t++)
{
@@ -203,24 +214,53 @@ public class SimpleClientPerfTest
for (int j = 0; j < 1; j++)
messages.add(requestMessage);
- if (measure.get())
- {
- long nanoStart = System.nanoTime();
- client.execute(messages);
- long diff = System.nanoTime() - nanoStart;
-
- lock.lock();
- try
+ if (measure.get())
{
-
stats.addValue(TimeUnit.MICROSECONDS.toMillis(diff));
+ try
+ {
+ limiter.acquire();
+ long nanoStart = System.nanoTime();
+ client.execute(messages);
+ long elapsed = System.nanoTime() -
nanoStart;
+
+ lock.lock();
+ try
+ {
+
stats.addValue(TimeUnit.NANOSECONDS.toMicros(elapsed));
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+ catch (RuntimeException e)
+ {
+ if (Throwables.anyCauseMatches(e, cause ->
cause instanceof OverloadedException))
+ {
+ overloadedExceptions.incrementAndGet();
+ }
+ else
+ {
+ throw e;
+ }
+ }
}
- finally
+ else
{
- lock.unlock();
+ try
+ {
+ limiter.acquire();
+ client.execute(messages); // warm-up
+ }
+ catch (RuntimeException e)
+ {
+ // Ignore overloads during warmup...
+ if (!Throwables.anyCauseMatches(e, cause
-> cause instanceof OverloadedException))
+ {
+ throw e;
+ }
+ }
}
- }
- else
- client.execute(messages); // warm-up
}
}
catch (Throwable e)
@@ -240,14 +280,19 @@ public class SimpleClientPerfTest
System.out.println("requestSize = " + requestSize);
System.out.println("responseSize = " + responseSize);
+
+ System.out.println("Latencies (in microseconds)");
+ System.out.println("Elements: " + stats.getN());
System.out.println("Mean: " + stats.getMean());
System.out.println("Variance: " + stats.getVariance());
System.out.println("Median: " + stats.getPercentile(0.5));
System.out.println("90p: " + stats.getPercentile(0.90));
System.out.println("95p: " + stats.getPercentile(0.95));
System.out.println("99p: " + stats.getPercentile(0.99));
+ System.out.println("Max: " + stats.getMax());
+
+ System.out.println("Failed due to overload: " +
overloadedExceptions.get());
server.stop();
}
}
-
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java
b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index 59968fe..f3c279c 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -50,6 +50,7 @@ import org.junit.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.codahale.metrics.Gauge;
import com.datastax.driver.core.*;
import com.datastax.driver.core.DataType;
import com.datastax.driver.core.ResultSet;
@@ -63,6 +64,7 @@ import org.apache.cassandra.index.SecondaryIndexManager;
import org.apache.cassandra.config.EncryptionOptions;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.metrics.CassandraMetricsRegistry;
import org.apache.cassandra.metrics.ClientMetrics;
import org.apache.cassandra.schema.*;
import org.apache.cassandra.config.DatabaseDescriptor;
@@ -90,7 +92,10 @@ import org.apache.cassandra.utils.JMXServerUtils;
import static
com.datastax.driver.core.SocketOptions.DEFAULT_CONNECT_TIMEOUT_MILLIS;
import static
com.datastax.driver.core.SocketOptions.DEFAULT_READ_TIMEOUT_MILLIS;
-import static junit.framework.Assert.assertNotNull;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
/**
* Base class for CQL tests.
@@ -941,6 +946,23 @@ public abstract class CQLTester
Assert.assertEquals(expectedArgTypes != null ?
Arrays.asList(expectedArgTypes) : null, schemaChange.argTypes);
}
+ protected static void assertWarningsContain(Message.Response response,
String message)
+ {
+ List<String> warnings = response.getWarnings();
+ Assert.assertNotNull(warnings);
+ assertTrue(warnings.stream().anyMatch(s -> s.contains(message)));
+ }
+
+ protected static void assertNoWarningContains(Message.Response response,
String message)
+ {
+ List<String> warnings = response.getWarnings();
+
+ if (warnings != null)
+ {
+ assertFalse(warnings.stream().anyMatch(s -> s.contains(message)));
+ }
+ }
+
protected static ResultMessage schemaChange(String query)
{
try
@@ -1806,6 +1828,17 @@ public abstract class CQLTester
return clusters.get(protocolVersion).getMetadata().newTupleType(types);
}
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ protected static Gauge<Integer> getPausedConnectionsGauge()
+ {
+ String metricName =
"org.apache.cassandra.metrics.Client.PausedConnections";
+ Map<String, Gauge> metrics =
CassandraMetricsRegistry.Metrics.getGauges((name, metric) ->
name.equals(metricName));
+ if (metrics.size() != 1)
+ fail(String.format("Expected a single registered metric for paused
client connections, found %s",
+ metrics.size()));
+ return metrics.get(metricName);
+ }
+
// Attempt to find an AbstracType from a value (for serialization/printing
sake).
// Will work as long as we use types we know of, which is good enough for
testing
private static AbstractType typeFor(Object value)
diff --git a/test/unit/org/apache/cassandra/net/ResourceLimitsTest.java
b/test/unit/org/apache/cassandra/net/ResourceLimitsTest.java
index f2f8a01..5c2ecbe 100644
--- a/test/unit/org/apache/cassandra/net/ResourceLimitsTest.java
+++ b/test/unit/org/apache/cassandra/net/ResourceLimitsTest.java
@@ -23,9 +23,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.LongFunction;
-import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.KillerForTests;
import org.junit.Assert;
diff --git a/test/unit/org/apache/cassandra/transport/CQLConnectionTest.java
b/test/unit/org/apache/cassandra/transport/CQLConnectionTest.java
index 96f28a2..73950ae 100644
--- a/test/unit/org/apache/cassandra/transport/CQLConnectionTest.java
+++ b/test/unit/org/apache/cassandra/transport/CQLConnectionTest.java
@@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.*;
+import org.apache.cassandra.transport.ClientResourceLimits.Overload;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
@@ -55,10 +56,12 @@ import
org.apache.cassandra.transport.CQLMessageHandler.MessageConsumer;
import org.apache.cassandra.transport.messages.*;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.concurrent.SimpleCondition;
+import org.apache.cassandra.utils.concurrent.NonBlockingRateLimiter;
import static
org.apache.cassandra.config.EncryptionOptions.TlsEncryptionPolicy.UNENCRYPTED;
import static org.apache.cassandra.net.FramingTest.randomishBytes;
import static org.apache.cassandra.transport.Flusher.MAX_FRAMED_PAYLOAD_SIZE;
+import static
org.apache.cassandra.utils.concurrent.NonBlockingRateLimiter.NO_OP_LIMITER;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -622,7 +625,7 @@ public class CQLConnectionTest
this.frameEncoder = frameEncoder;
}
- public void accept(Channel channel, Message.Request message,
Dispatcher.FlushItemConverter toFlushItem)
+ public void accept(Channel channel, Message.Request message,
Dispatcher.FlushItemConverter toFlushItem, Overload backpressure)
{
if (flusher == null)
flusher = new SimpleClient.SimpleFlusher(frameEncoder);
@@ -755,6 +758,12 @@ public class CQLConnectionTest
return delegate.endpointWaitQueue();
}
+ @Override
+ public NonBlockingRateLimiter requestRateLimiter()
+ {
+ return NO_OP_LIMITER;
+ }
+
public void release()
{
delegate.release();
diff --git
a/test/unit/org/apache/cassandra/transport/ClientResourceLimitsTest.java
b/test/unit/org/apache/cassandra/transport/ClientResourceLimitsTest.java
index 9cc900a..5cea90c 100644
--- a/test/unit/org/apache/cassandra/transport/ClientResourceLimitsTest.java
+++ b/test/unit/org/apache/cassandra/transport/ClientResourceLimitsTest.java
@@ -19,7 +19,6 @@
package org.apache.cassandra.transport;
import java.io.IOException;
-import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
@@ -29,9 +28,9 @@ import java.util.function.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.primitives.Ints;
+import org.apache.cassandra.service.StorageService;
import org.junit.*;
-import com.codahale.metrics.Gauge;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.cql3.QueryOptions;
@@ -40,13 +39,11 @@ import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.db.virtual.*;
import org.apache.cassandra.exceptions.OverloadedException;
-import org.apache.cassandra.metrics.CassandraMetricsRegistry;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.transport.messages.QueryMessage;
import org.apache.cassandra.utils.FBUtilities;
import org.awaitility.Awaitility;
-import static org.apache.cassandra.Util.spinAssertEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
@@ -55,19 +52,18 @@ import static org.junit.Assert.fail;
public class ClientResourceLimitsTest extends CQLTester
{
-
private static final long LOW_LIMIT = 600L;
private static final long HIGH_LIMIT = 5000000000L;
- private static final QueryOptions V5_DEFAULT_OPTIONS = QueryOptions.create(
- QueryOptions.DEFAULT.getConsistency(),
- QueryOptions.DEFAULT.getValues(),
- QueryOptions.DEFAULT.skipMetadata(),
- QueryOptions.DEFAULT.getPageSize(),
- QueryOptions.DEFAULT.getPagingState(),
- QueryOptions.DEFAULT.getSerialConsistency(),
- ProtocolVersion.V5,
- KEYSPACE);
+ private static final QueryOptions V5_DEFAULT_OPTIONS =
+ QueryOptions.create(QueryOptions.DEFAULT.getConsistency(),
+ QueryOptions.DEFAULT.getValues(),
+ QueryOptions.DEFAULT.skipMetadata(),
+ QueryOptions.DEFAULT.getPageSize(),
+ QueryOptions.DEFAULT.getPagingState(),
+ QueryOptions.DEFAULT.getSerialConsistency(),
+ ProtocolVersion.V5,
+ KEYSPACE);
@BeforeClass
public static void setUp()
@@ -75,7 +71,7 @@ public class ClientResourceLimitsTest extends CQLTester
DatabaseDescriptor.setNativeTransportReceiveQueueCapacityInBytes(1);
DatabaseDescriptor.setNativeTransportMaxConcurrentRequestsInBytesPerIp(LOW_LIMIT);
DatabaseDescriptor.setNativeTransportMaxConcurrentRequestsInBytes(LOW_LIMIT);
-
+
// The driver control connections would send queries that might
interfere with the tests.
requireNetworkWithoutDriver();
}
@@ -124,7 +120,7 @@ public class ClientResourceLimitsTest extends CQLTester
}
}
- @SuppressWarnings("resource")
+ @SuppressWarnings({"resource", "SameParameterValue"})
private SimpleClient client(boolean throwOnOverload, int
largeMessageThreshold)
{
try
@@ -184,27 +180,34 @@ public class ClientResourceLimitsTest extends CQLTester
(provider) -> provider.endpointWaitQueue().signal());
}
- private void backPressureTest(Runnable limitLifter,
Consumer<ClientResourceLimits.ResourceProvider> signaller)
- throws Throwable
+ private void backPressureTest(Runnable limitLifter,
Consumer<ClientResourceLimits.ResourceProvider> signaller) throws Throwable
{
final AtomicReference<Exception> error = new AtomicReference<>();
final CountDownLatch started = new CountDownLatch(1);
final CountDownLatch complete = new CountDownLatch(1);
- try(SimpleClient client = client(false))
+
+ try (SimpleClient client = client(false))
{
- QueryMessage queryMessage = new QueryMessage("CREATE TABLE atable
(pk int PRIMARY KEY, v text)",
- V5_DEFAULT_OPTIONS);
- client.execute(queryMessage);
-
- // There should be no paused client connections yet
- Gauge<Integer> pausedConnections = getPausedConnectionsGauge();
- int before = pausedConnections.getValue();
-
+ // The first query does not trigger backpressure/pause the
connection:
+ QueryMessage queryMessage =
+ new QueryMessage("CREATE TABLE atable (pk int PRIMARY KEY,
v text)", V5_DEFAULT_OPTIONS);
+ Message.Response belowThresholdResponse =
client.execute(queryMessage);
+ assertEquals(0, getPausedConnectionsGauge().getValue().intValue());
+ assertNoWarningContains(belowThresholdResponse, "bytes in flight");
+
+ // A second query triggers backpressure but is allowed to
complete...
+ Message.Response aboveThresholdResponse =
client.execute(queryMessage());
+ assertEquals(1, getPausedConnectionsGauge().getValue().intValue());
+ assertWarningsContain(aboveThresholdResponse, "bytes in flight");
+
+ // ...and a third request is paused.
+ final AtomicReference<Message.Response> response = new
AtomicReference<>();
+
Thread t = new Thread(() -> {
try
{
started.countDown();
- client.execute(queryMessage());
+ response.set(client.execute(queryMessage()));
complete.countDown();
}
catch (Exception e)
@@ -216,19 +219,13 @@ public class ClientResourceLimitsTest extends CQLTester
});
t.start();
- // When the client attempts to execute the second query, the
backpressure
- // mechanism should report the client connection is paused
- assertTrue(started.await(5, TimeUnit.SECONDS));
- spinAssertEquals("Timed out after waiting 5 seconds for paused " +
- "connections metric to increment due to
backpressure",
- before + 1, pausedConnections::getValue, 5,
TimeUnit.SECONDS);
-
// verify the request hasn't completed
assertFalse(complete.await(1, TimeUnit.SECONDS));
// backpressure has been applied, if we increase the limits of the
exhausted reserve and signal
// the appropriate WaitQueue, it should be released and the client
request will complete
limitLifter.run();
+
// We need a ResourceProvider to get access to the WaitQueue
ClientResourceLimits.Allocator allocator =
ClientResourceLimits.getAllocatorForEndpoint(FBUtilities.getJustLocalAddress());
ClientResourceLimits.ResourceProvider queueHandle = new
ClientResourceLimits.ResourceProvider.Default(allocator);
@@ -237,8 +234,10 @@ public class ClientResourceLimitsTest extends CQLTester
// SimpleClient has a 10 second timeout, so if we have to wait
// longer than that assume that we're not going to receive a
// reply. If all's well, the completion should happen immediately
- assertTrue(complete.await(11, TimeUnit.SECONDS));
+ assertTrue(complete.await(SimpleClient.TIMEOUT_SECONDS + 1,
TimeUnit.SECONDS));
assertNull(error.get());
+ assertEquals(0, getPausedConnectionsGauge().getValue().intValue());
+ assertNoWarningContains(response.get(), "bytes in flight");
}
}
@@ -304,17 +303,6 @@ public class ClientResourceLimitsTest extends CQLTester
return new QueryMessage(query.toString(), V5_DEFAULT_OPTIONS);
}
- @SuppressWarnings({"rawtypes", "unchecked"})
- private Gauge<Integer> getPausedConnectionsGauge()
- {
- String metricName =
"org.apache.cassandra.metrics.Client.PausedConnections";
- Map<String, Gauge> metrics =
CassandraMetricsRegistry.Metrics.getGauges((name, metric) ->
name.equals(metricName));
- if (metrics.size() != 1)
- fail(String.format("Expected a single registered metric for paused
client connections, found %s",
- metrics.size()));
- return metrics.get(metricName);
- }
-
@Test
public void testQueryUpdatesConcurrentMetricsUpdate() throws Throwable
{
@@ -469,4 +457,23 @@ public class ClientResourceLimitsTest extends CQLTester
client.close();
}
}
+
+ @Test
+ public void shouldChangeRequestsPerSecondAtRuntime()
+ {
+ StorageService.instance.setNativeTransportMaxRequestsPerSecond(100);
+ assertEquals(100,
ClientResourceLimits.getNativeTransportMaxRequestsPerSecond(), 0);
+ assertEquals(100,
ClientResourceLimits.GLOBAL_REQUEST_LIMITER.getRate(), 0);
+ assertEquals(100,
StorageService.instance.getNativeTransportMaxRequestsPerSecond());
+
+ StorageService.instance.setNativeTransportMaxRequestsPerSecond(1000);
+ assertEquals(1000,
ClientResourceLimits.getNativeTransportMaxRequestsPerSecond(), 0);
+ assertEquals(1000,
ClientResourceLimits.GLOBAL_REQUEST_LIMITER.getRate(), 0);
+ assertEquals(1000,
StorageService.instance.getNativeTransportMaxRequestsPerSecond());
+
+ StorageService.instance.setNativeTransportMaxRequestsPerSecond(500);
+ assertEquals(500,
ClientResourceLimits.getNativeTransportMaxRequestsPerSecond(), 0);
+ assertEquals(500,
ClientResourceLimits.GLOBAL_REQUEST_LIMITER.getRate(), 0);
+ assertEquals(500,
StorageService.instance.getNativeTransportMaxRequestsPerSecond());
+ }
}
diff --git a/test/unit/org/apache/cassandra/transport/RateLimitingTest.java
b/test/unit/org/apache/cassandra/transport/RateLimitingTest.java
new file mode 100644
index 0000000..8497c01
--- /dev/null
+++ b/test/unit/org/apache/cassandra/transport/RateLimitingTest.java
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.transport;
+
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import com.google.common.base.Ticker;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.service.StorageService;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.exceptions.OverloadedException;
+import org.apache.cassandra.transport.messages.QueryMessage;
+import org.apache.cassandra.utils.Throwables;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import static org.apache.cassandra.Util.spinAssertEquals;
+import static org.apache.cassandra.transport.ProtocolVersion.V4;
+
+@SuppressWarnings("UnstableApiUsage")
+@RunWith(Parameterized.class)
+public class RateLimitingTest extends CQLTester
+{
+ public static final String BACKPRESSURE_WARNING_SNIPPET = "Request
breached global limit";
+
+ private static final int LARGE_PAYLOAD_THRESHOLD_BYTES = 1000;
+ private static final int OVERLOAD_PERMITS_PER_SECOND = 1;
+
+ @Parameterized.Parameter
+ public ProtocolVersion version;
+
+ @Parameterized.Parameters(name="{0}")
+ public static Collection<Object[]> versions()
+ {
+ return ProtocolVersion.SUPPORTED.stream()
+ .map(v -> new Object[]{v})
+ .collect(Collectors.toList());
+ }
+
+ private AtomicLong tick;
+ private Ticker ticker;
+
+ @BeforeClass
+ public static void setup()
+ {
+ // If we don't exceed the queue capacity, we won't actually use the
global/endpoint
+ // bytes-in-flight limits, and the assertions we make below around
releasing them would be useless.
+ DatabaseDescriptor.setNativeTransportReceiveQueueCapacityInBytes(1);
+
+ // The driver control connections would send queries that might
interfere with the tests.
+ requireNetworkWithoutDriver();
+ }
+
+ @Before
+ public void resetLimits()
+ {
+ // Reset to the original start time in case a test advances the clock.
+ tick = new
AtomicLong(ClientResourceLimits.GLOBAL_REQUEST_LIMITER.getStartedNanos());
+
+ ticker = new Ticker()
+ {
+ @Override
+ public long read()
+ {
+ return tick.get();
+ }
+ };
+
+ ClientResourceLimits.setGlobalLimit(Long.MAX_VALUE);
+ }
+
+ @Test
+ public void shouldThrowOnOverloadSmallMessages() throws Exception
+ {
+ int payloadSize = LARGE_PAYLOAD_THRESHOLD_BYTES / 4;
+ testOverload(payloadSize, true);
+ }
+
+ @Test
+ public void shouldThrowOnOverloadLargeMessages() throws Exception
+ {
+ int payloadSize = LARGE_PAYLOAD_THRESHOLD_BYTES * 2;
+ testOverload(payloadSize, true);
+ }
+
+ @Test
+ public void shouldBackpressureSmallMessages() throws Exception
+ {
+ int payloadSize = LARGE_PAYLOAD_THRESHOLD_BYTES / 4;
+ testOverload(payloadSize, false);
+ }
+
+ @Test
+ public void shouldBackpressureLargeMessages() throws Exception
+ {
+ int payloadSize = LARGE_PAYLOAD_THRESHOLD_BYTES * 2;
+ testOverload(payloadSize, false);
+ }
+
+ @Test
+ public void shouldReleaseSmallMessageOnBytesInFlightOverload() throws
Exception
+ {
+ testBytesInFlightOverload(LARGE_PAYLOAD_THRESHOLD_BYTES / 4);
+ }
+
+ @Test
+ public void shouldReleaseLargeMessageOnBytesInFlightOverload() throws
Exception
+ {
+ testBytesInFlightOverload(LARGE_PAYLOAD_THRESHOLD_BYTES * 2);
+ }
+
+ private void testBytesInFlightOverload(int payloadSize) throws Exception
+ {
+ try (SimpleClient client = client().connect(false, true))
+ {
+
StorageService.instance.setNativeTransportRateLimitingEnabled(false);
+ QueryMessage queryMessage = new QueryMessage("CREATE TABLE IF NOT
EXISTS " + KEYSPACE + ".atable (pk int PRIMARY KEY, v text)", queryOptions());
+ client.execute(queryMessage);
+
+
StorageService.instance.setNativeTransportRateLimitingEnabled(true);
+
ClientResourceLimits.GLOBAL_REQUEST_LIMITER.setRate(OVERLOAD_PERMITS_PER_SECOND,
ticker);
+ ClientResourceLimits.setGlobalLimit(1);
+
+ try
+ {
+ // The first query takes the one available permit, but should
fail on the bytes in flight limit.
+ client.execute(queryMessage(payloadSize));
+ }
+ catch (RuntimeException e)
+ {
+ assertTrue(Throwables.anyCauseMatches(e, cause -> cause
instanceof OverloadedException));
+ }
+ }
+ finally
+ {
+ // Sanity check bytes in flight limiter.
+ assertEquals(0, ClientResourceLimits.getCurrentGlobalUsage());
+
StorageService.instance.setNativeTransportRateLimitingEnabled(false);
+ }
+ }
+
+ private void testOverload(int payloadSize, boolean throwOnOverload) throws
Exception
+ {
+ try (SimpleClient client = client().connect(false, throwOnOverload))
+ {
+
StorageService.instance.setNativeTransportRateLimitingEnabled(false);
+ QueryMessage queryMessage = new QueryMessage("CREATE TABLE IF NOT
EXISTS " + KEYSPACE + ".atable (pk int PRIMARY KEY, v text)", queryOptions());
+ client.execute(queryMessage);
+
+
StorageService.instance.setNativeTransportRateLimitingEnabled(true);
+
ClientResourceLimits.GLOBAL_REQUEST_LIMITER.setRate(OVERLOAD_PERMITS_PER_SECOND,
ticker);
+
+ if (throwOnOverload)
+ testThrowOnOverload(payloadSize, client);
+ else
+ {
+ testBackpressureOnOverload(payloadSize, client);
+ }
+ }
+ finally
+ {
+ // Sanity the check bytes in flight limiter.
+ assertEquals(0, ClientResourceLimits.getCurrentGlobalUsage());
+
StorageService.instance.setNativeTransportRateLimitingEnabled(false);
+ }
+ }
+
+ private void testBackpressureOnOverload(int payloadSize, SimpleClient
client) throws Exception
+ {
+ // The first query takes the one available permit.
+ Message.Response firstResponse =
client.execute(queryMessage(payloadSize));
+ assertEquals(0, getPausedConnectionsGauge().getValue().intValue());
+ assertNoWarningContains(firstResponse, BACKPRESSURE_WARNING_SNIPPET);
+
+ // The second query activates backpressure.
+ long overloadQueryStartTime = System.currentTimeMillis();
+ Message.Response response = client.execute(queryMessage(payloadSize));
+
+ // V3 does not support client warnings, but otherwise we should get
one for this query.
+ if (version.isGreaterOrEqualTo(V4))
+ assertWarningsContain(response, BACKPRESSURE_WARNING_SNIPPET);
+
+ AtomicReference<Throwable> error = new AtomicReference<>();
+ CountDownLatch started = new CountDownLatch(1);
+ CountDownLatch complete = new CountDownLatch(1);
+ AtomicReference<Message.Response> pausedQueryResponse = new
AtomicReference<>();
+
+ Thread queryRunner = new Thread(() ->
+ {
+ try
+ {
+ started.countDown();
+
pausedQueryResponse.set(client.execute(queryMessage(payloadSize)));
+ complete.countDown();
+ }
+ catch (Throwable t)
+ {
+ error.set(t);
+ }
+ });
+
+ // Advance the rater limiter so that this query will see an available
permit. This also
+ // means it should not produce a client warning, which we verify below.
+ // (Note that we advance 2 intervals for the 2 prior queries.)
+ tick.addAndGet(2 *
ClientResourceLimits.GLOBAL_REQUEST_LIMITER.getIntervalNanos());
+
+ queryRunner.start();
+
+ // ...and the request should complete without error.
+ assertTrue(complete.await(SimpleClient.TIMEOUT_SECONDS + 1,
TimeUnit.SECONDS));
+ assertNull(error.get());
+ assertNoWarningContains(pausedQueryResponse.get(),
BACKPRESSURE_WARNING_SNIPPET);
+
+ // At least the number of milliseconds in the permit interval should
already have elapsed
+ // since the start of the query that pauses the connection.
+ double permitIntervalMillis = (double) TimeUnit.SECONDS.toMillis(1L) /
OVERLOAD_PERMITS_PER_SECOND;
+ long sinceQueryStarted = System.currentTimeMillis() -
overloadQueryStartTime;
+ long remainingMillis = ((long) permitIntervalMillis) -
sinceQueryStarted;
+ assertTrue("Query completed before connection unpause!",
remainingMillis <= 0);
+
+ spinAssertEquals("Timed out after waiting 5 seconds for paused
connections metric to normalize.",
+ 0, () -> getPausedConnectionsGauge().getValue(), 5,
TimeUnit.SECONDS);
+ }
+
+ private void testThrowOnOverload(int payloadSize, SimpleClient client)
+ {
+ // The first query takes the one available permit...
+ client.execute(queryMessage(payloadSize));
+
+ try
+ {
+ // ...and the second breaches the limit....
+ client.execute(queryMessage(payloadSize));
+ }
+ catch (RuntimeException e)
+ {
+ assertTrue(Throwables.anyCauseMatches(e, cause -> cause instanceof
OverloadedException));
+ }
+
+ // Advance the timeline and verify that we can take a permit again.
+ // (Note that we don't take one when we throw on overload.)
+
tick.addAndGet(ClientResourceLimits.GLOBAL_REQUEST_LIMITER.getIntervalNanos());
+ client.execute(queryMessage(payloadSize));
+ }
+
+ private QueryMessage queryMessage(int length)
+ {
+ StringBuilder query = new StringBuilder("INSERT INTO " + KEYSPACE +
".atable (pk, v) VALUES (1, '");
+
+ for (int i = 0; i < length; i++)
+ {
+ query.append('a');
+ }
+
+ query.append("')");
+ return new QueryMessage(query.toString(), queryOptions());
+ }
+
+ private SimpleClient client()
+ {
+ return SimpleClient.builder(nativeAddr.getHostAddress(), nativePort)
+ .protocolVersion(version)
+ .useBeta()
+
.largeMessageThreshold(LARGE_PAYLOAD_THRESHOLD_BYTES)
+ .build();
+ }
+
+ private QueryOptions queryOptions()
+ {
+ return QueryOptions.create(QueryOptions.DEFAULT.getConsistency(),
+ QueryOptions.DEFAULT.getValues(),
+ QueryOptions.DEFAULT.skipMetadata(),
+ QueryOptions.DEFAULT.getPageSize(),
+ QueryOptions.DEFAULT.getPagingState(),
+ QueryOptions.DEFAULT.getSerialConsistency(),
+ version,
+ KEYSPACE);
+ }
+}
diff --git
a/test/unit/org/apache/cassandra/utils/concurrent/NonBlockingRateLimiterTest.java
b/test/unit/org/apache/cassandra/utils/concurrent/NonBlockingRateLimiterTest.java
new file mode 100644
index 0000000..d63cef3
--- /dev/null
+++
b/test/unit/org/apache/cassandra/utils/concurrent/NonBlockingRateLimiterTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils.concurrent;
+
+import com.google.common.base.Ticker;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@SuppressWarnings("UnstableApiUsage")
+public class NonBlockingRateLimiterTest
+{
+ private static final AtomicLong CLOCK = new AtomicLong(0);
+ private static final TimeUnit DELAY_UNIT = TimeUnit.NANOSECONDS;
+
+ private static final Ticker TICKER = new Ticker()
+ {
+ @Override
+ public long read() {
+ return CLOCK.get();
+ }
+ };
+
+ @Before
+ public void resetTicker()
+ {
+ CLOCK.set(0);
+ }
+
+ @Test
+ public void testUnconditionalReservation()
+ {
+ NonBlockingRateLimiter limiter = new NonBlockingRateLimiter(4, 0,
TICKER);
+ long oneSecond = DELAY_UNIT.convert(1, TimeUnit.SECONDS);
+ long oneDelay = oneSecond / 4;
+
+ // Delays should begin accumulating without any ticker movement...
+ assertEquals(0, limiter.reserveAndGetDelay(DELAY_UNIT));
+ assertEquals(oneDelay, limiter.reserveAndGetDelay(DELAY_UNIT));
+ assertEquals(oneDelay * 2, limiter.reserveAndGetDelay(DELAY_UNIT));
+ assertEquals(oneDelay * 3, limiter.reserveAndGetDelay(DELAY_UNIT));
+
+ // ...but should be gone after advancing enough to free up a permit.
+ CLOCK.addAndGet(NonBlockingRateLimiter.NANOS_PER_SECOND);
+ assertEquals(0, limiter.reserveAndGetDelay(DELAY_UNIT));
+ }
+
+ @Test
+ public void testConditionalReservation()
+ {
+ NonBlockingRateLimiter limiter = new NonBlockingRateLimiter(1, 0,
TICKER);
+
+ // Take the available permit, but then fail a subsequent attempt.
+ assertTrue(limiter.tryReserve());
+ assertFalse(limiter.tryReserve());
+
+ // We only need to advance one second, as the second attempt should
not get a permit.
+ CLOCK.addAndGet(NonBlockingRateLimiter.NANOS_PER_SECOND);
+ assertTrue(limiter.tryReserve());
+ }
+
+ @Test
+ public void testBurstPermitConsumption()
+ {
+ // Create a limiter that produces 2 permits/second and allows 1-second
bursts.
+ NonBlockingRateLimiter limiter = new NonBlockingRateLimiter(1,
NonBlockingRateLimiter.DEFAULT_BURST_NANOS, TICKER);
+
+ // Advance the clock to create a 1-second idle period, which makes one
burst permit available.
+ CLOCK.addAndGet(NonBlockingRateLimiter.NANOS_PER_SECOND);
+
+ // Take the burst permit.
+ assertTrue(limiter.tryReserve());
+
+ // Take the "normal" permit.
+ assertTrue(limiter.tryReserve());
+
+ // Then fail, as we've consumed both.
+ assertFalse(limiter.tryReserve());
+
+ // Advance 1 interval again...
+ CLOCK.addAndGet(NonBlockingRateLimiter.NANOS_PER_SECOND);
+
+ // ...and only one permit should be available, as we've reached a
steady state.
+ assertTrue(limiter.tryReserve());
+ assertFalse(limiter.tryReserve());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testMaximumRate()
+ {
+ new NonBlockingRateLimiter(Integer.MAX_VALUE, 0,
Ticker.systemTicker());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testMinimumRate()
+ {
+ new NonBlockingRateLimiter(-1, 0, Ticker.systemTicker());
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]