This is an automated email from the ASF dual-hosted git repository.
samt pushed a commit to branch cassandra-4.1
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-4.1 by this push:
new 6f90e962f5 Enforce CQL message size limit on multiframe messages
6f90e962f5 is described below
commit 6f90e962f54c4b1a90ad6c3dc0bb6a224843abf0
Author: Dmitry Konstantinov <[email protected]>
AuthorDate: Mon Nov 4 21:17:24 2024 +0000
Enforce CQL message size limit on multiframe messages
Patch by Dmitry Konstantinov; reviewed by Sam Tunnicliffe,
Caleb Rackliffe for CASSANDRA-20052
---
CHANGES.txt | 1 +
src/java/org/apache/cassandra/config/Config.java | 1 +
.../cassandra/config/DatabaseDescriptor.java | 52 +++++++
.../exceptions/OversizedCQLMessageException.java | 27 ++++
.../cassandra/net/AbstractMessageHandler.java | 4 +-
.../cassandra/transport/CQLMessageHandler.java | 110 ++++++++++++--
.../cassandra/transport/ExceptionHandlers.java | 5 +
.../transport/InitialConnectionHandler.java | 3 +-
.../cassandra/transport/PipelineConfigurator.java | 2 +
.../apache/cassandra/transport/SimpleClient.java | 1 +
.../transport/AuthMessageSizeLimitTest.java | 104 +++++++++++++
.../transport/ClientResourceLimitsTest.java | 106 ++++---------
.../cassandra/transport/MessageSizeLimitTest.java | 124 ++++++++++++++++
.../transport/NativeProtocolLimitsTestBase.java | 165 +++++++++++++++++++++
.../cassandra/transport/RateLimitingTest.java | 69 +++------
15 files changed, 632 insertions(+), 142 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 3e5597558d..45ef4fe379 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.1.8
+ * Enforce CQL message size limit on multiframe messages (CASSANDRA-20052)
* Add nodetool checktokenmetadata command that checks TokenMetadata is insync
with Gossip endpointState (CASSANDRA-18758)
* Backport Java 11 support for Simulator (CASSANDRA-17178/CASSANDRA-19935)
* Equality check for Paxos.Electorate should not depend on collection types
(CASSANDRA-19935)
diff --git a/src/java/org/apache/cassandra/config/Config.java
b/src/java/org/apache/cassandra/config/Config.java
index d8d1f7e617..b64ff079de 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -269,6 +269,7 @@ public class Config
public int native_transport_max_threads = 128;
@Replaces(oldName = "native_transport_max_frame_size_in_mb", converter =
Converters.MEBIBYTES_DATA_STORAGE_INT, deprecated = true)
public DataStorageSpec.IntMebibytesBound native_transport_max_frame_size =
new DataStorageSpec.IntMebibytesBound("16MiB");
+ public volatile DataStorageSpec.LongBytesBound
native_transport_max_message_size = null;
/** do bcrypt hashing in a limited pool to prevent cpu load spikes; 0
means that all requests will go to default request executor**/
public int native_transport_max_auth_threads = 0;
public volatile long native_transport_max_concurrent_connections = -1L;
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index b85a680cfa..3ff03ce801 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -153,6 +153,9 @@ public class DatabaseDescriptor
private static long counterCacheSizeInMiB;
private static long indexSummaryCapacityInMiB;
+ private static volatile long nativeTransportMaxMessageSizeInBytes;
+ private static volatile boolean
nativeTransportMaxMessageSizeConfiguredExplicitly;
+
private static String localDC;
private static Comparator<Replica> localComparator;
private static EncryptionContext encryptionContext;
@@ -822,6 +825,23 @@ public class DatabaseDescriptor
else if (conf.commitlog_segment_size.toKibibytes() < 2 *
conf.max_mutation_size.toKibibytes())
throw new ConfigurationException("commitlog_segment_size must be
at least twice the size of max_mutation_size / 1024", false);
+ if (conf.native_transport_max_message_size == null)
+ {
+ conf.native_transport_max_message_size = new
DataStorageSpec.LongBytesBound(calculateDefaultNativeTransportMaxMessageSizeInBytes());
+ }
+ else
+ {
+ nativeTransportMaxMessageSizeConfiguredExplicitly = true;
+ long maxCqlMessageSize =
conf.native_transport_max_message_size.toBytes();
+ if (maxCqlMessageSize >
conf.native_transport_max_request_data_in_flight.toBytes())
+ throw new
ConfigurationException("native_transport_max_message_size must not exceed
native_transport_max_request_data_in_flight", false);
+
+ if (maxCqlMessageSize >
conf.native_transport_max_request_data_in_flight_per_ip.toBytes())
+ throw new
ConfigurationException("native_transport_max_message_size must not exceed
native_transport_max_request_data_in_flight_per_ip", false);
+
+ }
+ nativeTransportMaxMessageSizeInBytes =
conf.native_transport_max_message_size.toBytes();
+
// native transport encryption options
if (conf.client_encryption_options != null)
{
@@ -2748,6 +2768,30 @@ public class DatabaseDescriptor
return
conf.native_transport_max_request_data_in_flight_per_ip.toBytes();
}
+ public static long getNativeTransportMaxMessageSizeInBytes()
+ {
+ // the value of native_transport_max_message_size in bytes is cached
+ // to avoid conversion overhead during a parsing of each incoming CQL
message
+ return nativeTransportMaxMessageSizeInBytes;
+ }
+
+ @VisibleForTesting
+ public static void setNativeTransportMaxMessageSizeInBytes(long
maxMessageSizeInBytes)
+ {
+ conf.native_transport_max_message_size = new
DataStorageSpec.LongBytesBound(maxMessageSizeInBytes);
+ nativeTransportMaxMessageSizeInBytes =
conf.native_transport_max_message_size.toBytes();
+ }
+
+ private static long calculateDefaultNativeTransportMaxMessageSizeInBytes()
+ {
+ return Math.min(conf.max_mutation_size.toBytes(),
+ Math.min(
+ conf.native_transport_max_request_data_in_flight.toBytes(),
+
conf.native_transport_max_request_data_in_flight_per_ip.toBytes()
+ )
+ );
+ }
+
public static Config.PaxosVariant getPaxosVariant()
{
return conf.paxos_variant;
@@ -2874,6 +2918,10 @@ public class DatabaseDescriptor
maxRequestDataInFlightInBytes = Runtime.getRuntime().maxMemory() /
40;
conf.native_transport_max_request_data_in_flight_per_ip = new
DataStorageSpec.LongBytesBound(maxRequestDataInFlightInBytes);
+ long newNativeTransportMaxMessageSizeInBytes =
nativeTransportMaxMessageSizeConfiguredExplicitly
+ ?
Math.min(maxRequestDataInFlightInBytes,
getNativeTransportMaxMessageSizeInBytes())
+ :
calculateDefaultNativeTransportMaxMessageSizeInBytes();
+
setNativeTransportMaxMessageSizeInBytes(newNativeTransportMaxMessageSizeInBytes);
}
public static long getNativeTransportMaxRequestDataInFlightInBytes()
@@ -2887,6 +2935,10 @@ public class DatabaseDescriptor
maxRequestDataInFlightInBytes = Runtime.getRuntime().maxMemory() /
10;
conf.native_transport_max_request_data_in_flight = new
DataStorageSpec.LongBytesBound(maxRequestDataInFlightInBytes);
+ long newNativeTransportMaxMessageSizeInBytes =
nativeTransportMaxMessageSizeConfiguredExplicitly
+ ?
Math.min(maxRequestDataInFlightInBytes,
getNativeTransportMaxMessageSizeInBytes())
+ :
calculateDefaultNativeTransportMaxMessageSizeInBytes();
+
setNativeTransportMaxMessageSizeInBytes(newNativeTransportMaxMessageSizeInBytes);
}
public static int getNativeTransportMaxRequestsPerSecond()
diff --git
a/src/java/org/apache/cassandra/exceptions/OversizedCQLMessageException.java
b/src/java/org/apache/cassandra/exceptions/OversizedCQLMessageException.java
new file mode 100644
index 0000000000..60f9c48835
--- /dev/null
+++ b/src/java/org/apache/cassandra/exceptions/OversizedCQLMessageException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.exceptions;
+
+public class OversizedCQLMessageException extends InvalidRequestException
+{
+ public OversizedCQLMessageException(String message)
+ {
+ super(message);
+ }
+}
diff --git a/src/java/org/apache/cassandra/net/AbstractMessageHandler.java
b/src/java/org/apache/cassandra/net/AbstractMessageHandler.java
index e2cf68d6d1..5b5b8b7f1a 100644
--- a/src/java/org/apache/cassandra/net/AbstractMessageHandler.java
+++ b/src/java/org/apache/cassandra/net/AbstractMessageHandler.java
@@ -562,7 +562,7 @@ public abstract class AbstractMessageHandler extends
ChannelInboundHandlerAdapte
return size == received;
}
- private void onIntactFrame(IntactFrame frame)
+ protected void onIntactFrame(IntactFrame frame)
{
boolean expires = approxTime.isAfter(expiresAtNanos);
if (!isExpired && !isCorrupt)
@@ -578,7 +578,7 @@ public abstract class AbstractMessageHandler extends
ChannelInboundHandlerAdapte
isExpired |= expires;
}
- private void onCorruptFrame()
+ protected void onCorruptFrame()
{
if (!isExpired && !isCorrupt)
releaseBuffersAndCapacity(); // release resources once we
transition from normal state to corrupt
diff --git a/src/java/org/apache/cassandra/transport/CQLMessageHandler.java
b/src/java/org/apache/cassandra/transport/CQLMessageHandler.java
index 65c0282908..792f6bf7b6 100644
--- a/src/java/org/apache/cassandra/transport/CQLMessageHandler.java
+++ b/src/java/org/apache/cassandra/transport/CQLMessageHandler.java
@@ -31,6 +31,7 @@ import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.exceptions.OverloadedException;
+import org.apache.cassandra.exceptions.OversizedCQLMessageException;
import org.apache.cassandra.metrics.ClientMessageSizeMetrics;
import org.apache.cassandra.metrics.ClientMetrics;
import org.apache.cassandra.net.AbstractMessageHandler;
@@ -81,6 +82,10 @@ public class CQLMessageHandler<M extends Message> extends
AbstractMessageHandler
public static final int LARGE_MESSAGE_THRESHOLD =
FrameEncoder.Payload.MAX_SIZE - 1;
public static final TimeUnit RATE_LIMITER_DELAY_UNIT =
TimeUnit.NANOSECONDS;
+ static final String MULTI_FRAME_AUTH_ERROR_MESSAGE_PREFIX = "The
connection is not yet in a valid state " +
+ "to process
multi frame CQL Messages, usually this" +
+ "means that
authentication is still pending. ";
+
private final QueueBackpressure queueBackpressure;
private final Envelope.Decoder envelopeDecoder;
private final Message.Decoder<M> messageDecoder;
@@ -94,6 +99,8 @@ public class CQLMessageHandler<M extends Message> extends
AbstractMessageHandler
long channelPayloadBytesInFlight;
private int consecutiveMessageErrors = 0;
+ private final ServerConnection serverConnection;
+
interface MessageConsumer<M extends Message>
{
void dispatch(Channel channel, M message,
Dispatcher.FlushItemConverter toFlushItem, Overload backpressure);
@@ -106,6 +113,7 @@ public class CQLMessageHandler<M extends Message> extends
AbstractMessageHandler
}
CQLMessageHandler(Channel channel,
+ ServerConnection serverConnection,
ProtocolVersion version,
FrameDecoder decoder,
Envelope.Decoder envelopeDecoder,
@@ -128,6 +136,7 @@ public class CQLMessageHandler<M extends Message> extends
AbstractMessageHandler
resources.endpointWaitQueue(),
resources.globalWaitQueue(),
onClosed);
+ this.serverConnection = serverConnection;
this.envelopeDecoder = envelopeDecoder;
this.messageDecoder = messageDecoder;
this.payloadAllocator = payloadAllocator;
@@ -518,10 +527,28 @@ public class CQLMessageHandler<M extends Message> extends
AbstractMessageHandler
// max CQL message size defaults to 256mb, so should be safe to
downcast
int messageSize = Ints.checkedCast(header.bodySizeInBytes);
receivedBytes += buf.remaining();
+
+ if (serverConnection != null && serverConnection.stage() !=
ConnectionStage.READY)
+ {
+ // Disallow any multiframe messages before the connection
reaches the READY state.
+ // This guards against being swamped with oversize messages
from unauthenticated
+ // clients. In this case, we raise a fatal error and close the
connection so it does
+ // not make sense to continue processing subsequent frames
+ handleError(ProtocolException.toFatalException(new
OversizedAuthMessageException(
+ MULTI_FRAME_AUTH_ERROR_MESSAGE_PREFIX +
+ "type = " + header.type + ", size = " +
header.bodySizeInBytes)));
+ ClientMetrics.instance.markRequestDiscarded();
+ return false;
+ }
LargeMessage largeMessage = new LargeMessage(header);
-
- if (throwOnOverload)
+ if (messageSize >
DatabaseDescriptor.getNativeTransportMaxMessageSizeInBytes())
+ {
+ ClientMetrics.instance.markRequestDiscarded();
+ // Mark as too big so that discard the message after consuming
any subsequent frames
+ largeMessage.markTooBig();
+ }
+ else if (throwOnOverload)
{
if (!acquireCapacity(header, endpointReserve, globalReserve))
{
@@ -567,9 +594,9 @@ public class CQLMessageHandler<M extends Message> extends
AbstractMessageHandler
}
}
}
- else
+ else // throwOnOverload = false
{
- if (acquireCapacity(header, endpointReserve, globalReserve))
+ if (acquireCapacityAndQueueOnFailure(header, endpointReserve,
globalReserve))
{
long delay = -1;
Overload backpressure = Overload.NONE;
@@ -605,7 +632,14 @@ public class CQLMessageHandler<M extends Message> extends
AbstractMessageHandler
}
else
{
- noSpamLogger.error("Could not aquire capacity while
processing native protocol message");
+ // we checked previously that messageSize <=
native_transport_max_message_size
+ // and native_transport_max_message_size <=
native_transport_max_request_data_in_flight
+ // and native_transport_max_message_size <=
native_transport_max_request_data_in_flight_per_ip
+ // so, a starvation is not possible for the following case:
+ // a connection is blocked forever if somebody tries to
send a single too big message > total rate limiting capacity.
+ // Once other messages in the same or other CQL
connections are processed and capacity is returned to the limits
+ // we have enough capacity to acquire it for the current
large message.
+ return false;
}
}
@@ -712,6 +746,7 @@ public class CQLMessageHandler<M extends Message> extends
AbstractMessageHandler
private Overload overload = Overload.NONE;
private Overload backpressure = Overload.NONE;
+ private boolean tooBig = false;
private LargeMessage(Envelope.Header header)
{
@@ -747,18 +782,75 @@ public class CQLMessageHandler<M extends Message> extends
AbstractMessageHandler
this.backpressure = backpressure;
}
+ private void markTooBig()
+ {
+ this.tooBig = true;
+ }
+
+ @Override
+ protected void onIntactFrame(IntactFrame frame)
+ {
+ if (tooBig || overload != Overload.NONE)
+ // we do not want to add the frame to buffers (to not consume
a lot of memory and throw it away later
+ // we also do not want to release capacity because we haven't
accuired it
+ frame.consume();
+ else
+ super.onIntactFrame(frame);
+ }
+
+ @Override
+ protected void onCorruptFrame()
+ {
+ if (!isExpired && !isCorrupt && !tooBig)
+ {
+ releaseBuffers(); // release resources once we transition from
normal state to corrupt
+ if (overload != Overload.BYTES_IN_FLIGHT)
+ releaseCapacity(size);
+ }
+ isCorrupt = true;
+ isExpired |= approxTime.isAfter(expiresAtNanos);
+ }
+
+
+ @Override
protected void onComplete()
{
- if (overload != Overload.NONE)
-
handleErrorAndRelease(buildOverloadedException(endpointReserveCapacity,
globalReserveCapacity, overload), header);
+ if (tooBig)
+ // we haven't accuired a capacity for too big messages to
release it
+
handleError(buildOversizedCQLMessageException(header.bodySizeInBytes), header);
+ else if (overload != Overload.NONE)
+ if (overload == Overload.BYTES_IN_FLIGHT)
+ // we haven't accuired a capacity successfully to release
it
+
handleError(buildOverloadedException(endpointReserveCapacity,
globalReserveCapacity, overload), header);
+ else
+
handleErrorAndRelease(buildOverloadedException(endpointReserveCapacity,
globalReserveCapacity, overload), header);
else if (!isCorrupt)
processRequest(assembleFrame(), backpressure);
}
+ @Override
protected void abort()
{
- if (!isCorrupt)
- releaseBuffersAndCapacity(); // release resources if in normal
state when abort() is invoked
+ if (!isCorrupt && !tooBig && overload == Overload.NONE)
+ releaseBuffers();
+
+ if (overload == Overload.NONE || overload ==
Overload.BYTES_IN_FLIGHT)
+ releaseCapacity(size);
+ }
+
+ private OversizedCQLMessageException
buildOversizedCQLMessageException(long messageBodySize)
+ {
+ return new OversizedCQLMessageException("CQL Message of size " +
messageBodySize
+ + " bytes exceeds allowed
maximum of "
+ +
DatabaseDescriptor.getNativeTransportMaxMessageSizeInBytes() + " bytes");
+ }
+ }
+
+ static class OversizedAuthMessageException extends ProtocolException
+ {
+ OversizedAuthMessageException(String message)
+ {
+ super(message);
}
}
}
diff --git a/src/java/org/apache/cassandra/transport/ExceptionHandlers.java
b/src/java/org/apache/cassandra/transport/ExceptionHandlers.java
index 4f063924ea..4d36fa6cd7 100644
--- a/src/java/org/apache/cassandra/transport/ExceptionHandlers.java
+++ b/src/java/org/apache/cassandra/transport/ExceptionHandlers.java
@@ -38,6 +38,7 @@ import io.netty.channel.ChannelPromise;
import io.netty.channel.unix.Errors;
import org.apache.cassandra.exceptions.OverloadedException;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.exceptions.OversizedCQLMessageException;
import org.apache.cassandra.metrics.ClientMetrics;
import org.apache.cassandra.net.FrameEncoder;
import org.apache.cassandra.transport.messages.ErrorMessage;
@@ -130,6 +131,10 @@ public class ExceptionHandlers
// Once the threshold for overload is breached, it will very
likely spam the logs...
NoSpamLogger.log(logger, NoSpamLogger.Level.INFO, 1,
TimeUnit.MINUTES, cause.getMessage());
}
+ else if (Throwables.anyCauseMatches(cause, t -> t instanceof
OversizedCQLMessageException))
+ {
+ NoSpamLogger.log(logger, NoSpamLogger.Level.INFO, 1,
TimeUnit.MINUTES, cause.getMessage());
+ }
else if (Throwables.anyCauseMatches(cause, t -> t instanceof
Errors.NativeIoException))
{
ClientMetrics.instance.markUnknownException();
diff --git
a/src/java/org/apache/cassandra/transport/InitialConnectionHandler.java
b/src/java/org/apache/cassandra/transport/InitialConnectionHandler.java
index 576af3e6dc..463d824c4c 100644
--- a/src/java/org/apache/cassandra/transport/InitialConnectionHandler.java
+++ b/src/java/org/apache/cassandra/transport/InitialConnectionHandler.java
@@ -104,6 +104,7 @@ public class InitialConnectionHandler extends
ByteToMessageDecoder
attrConn.set(connection);
}
assert connection instanceof ServerConnection;
+ ServerConnection serverConnection = (ServerConnection)
connection;
StartupMessage startup = (StartupMessage)
Message.Decoder.decodeMessage(ctx.channel(), inbound);
InetAddress remoteAddress = ((InetSocketAddress)
ctx.channel().remoteAddress()).getAddress();
@@ -120,7 +121,7 @@ public class InitialConnectionHandler extends
ByteToMessageDecoder
if (future.isSuccess())
{
logger.trace("Response to STARTUP sent,
configuring pipeline for {}", inbound.header.version);
- configurator.configureModernPipeline(ctx,
allocator, inbound.header.version, startup.options);
+ configurator.configureModernPipeline(ctx,
serverConnection, allocator, inbound.header.version, startup.options);
allocator.release(inbound.header.bodySizeInBytes);
}
else
diff --git a/src/java/org/apache/cassandra/transport/PipelineConfigurator.java
b/src/java/org/apache/cassandra/transport/PipelineConfigurator.java
index 10ca818719..6ad00471c7 100644
--- a/src/java/org/apache/cassandra/transport/PipelineConfigurator.java
+++ b/src/java/org/apache/cassandra/transport/PipelineConfigurator.java
@@ -270,6 +270,7 @@ public class PipelineConfigurator
}
public void configureModernPipeline(ChannelHandlerContext ctx,
+ ServerConnection serverConnection,
ClientResourceLimits.Allocator
resourceAllocator,
ProtocolVersion version,
Map<String, String> options)
@@ -311,6 +312,7 @@ public class PipelineConfigurator
CQLMessageHandler.MessageConsumer<Message.Request> messageConsumer =
messageConsumer();
CQLMessageHandler<Message.Request> processor =
new CQLMessageHandler<>(ctx.channel(),
+ serverConnection,
version,
frameDecoder,
envelopeDecoder,
diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java
b/src/java/org/apache/cassandra/transport/SimpleClient.java
index f9c9b7f5d7..a7227c89a5 100644
--- a/src/java/org/apache/cassandra/transport/SimpleClient.java
+++ b/src/java/org/apache/cassandra/transport/SimpleClient.java
@@ -504,6 +504,7 @@ public class SimpleClient implements Closeable
CQLMessageHandler<Message.Response> processor =
new CQLMessageHandler<Message.Response>(ctx.channel(),
+ null,
version,
frameDecoder,
envelopeDecoder,
diff --git
a/test/unit/org/apache/cassandra/transport/AuthMessageSizeLimitTest.java
b/test/unit/org/apache/cassandra/transport/AuthMessageSizeLimitTest.java
new file mode 100644
index 0000000000..5eaee511d5
--- /dev/null
+++ b/test/unit/org/apache/cassandra/transport/AuthMessageSizeLimitTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.transport;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.datastax.driver.core.Authenticator;
+import com.datastax.driver.core.EndPoint;
+import com.datastax.driver.core.PlainTextAuthProvider;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.FrameEncoder;
+import org.apache.cassandra.transport.messages.AuthResponse;
+import org.apache.cassandra.transport.messages.QueryMessage;
+import org.assertj.core.api.Assertions;
+
+public class AuthMessageSizeLimitTest extends NativeProtocolLimitsTestBase
+{
+ private static final int TOO_BIG_MULTI_FRAME_AUTH_MESSAGE_SIZE = 2 *
FrameEncoder.Payload.MAX_SIZE;
+
+ // set MAX_CQL_MESSAGE_SIZE bigger than
TOO_BIG_MULTI_FRAME_AUTH_MESSAGE_SIZE to ensure what the auth message size
check is more restrictive
+ private static final int MAX_CQL_MESSAGE_SIZE =
TOO_BIG_MULTI_FRAME_AUTH_MESSAGE_SIZE * 2;
+
+ @BeforeClass
+ public static void setUp()
+ {
+ DatabaseDescriptor.setNativeTransportReceiveQueueCapacityInBytes(1);
+
DatabaseDescriptor.setNativeTransportMaxRequestDataInFlightPerIpInBytes(MAX_CQL_MESSAGE_SIZE);
+
DatabaseDescriptor.setNativeTransportConcurrentRequestDataInFlightInBytes(MAX_CQL_MESSAGE_SIZE);
+
DatabaseDescriptor.setNativeTransportMaxMessageSizeInBytes(MAX_CQL_MESSAGE_SIZE);
+ requireNetwork();
+ requireAuthentication();
+ }
+
+ @Before
+ public void setLimits()
+ {
+ ClientResourceLimits.setGlobalLimit(MAX_CQL_MESSAGE_SIZE);
+ ClientResourceLimits.setEndpointLimit(MAX_CQL_MESSAGE_SIZE);
+ }
+
+ @Test
+ public void sendSmallAuthMessage()
+ {
+ doTest((client) ->
+ {
+ AuthResponse authResponse = createAuthMessage("cassandra",
"cassandra");
+ client.execute(authResponse);
+ createTable(client);
+
+ int valueLessThanMessageMaxSize = MAX_CQL_MESSAGE_SIZE -
500;
+ QueryMessage queryMessage =
queryMessage(valueLessThanMessageMaxSize);
+ client.execute(queryMessage);
+ }
+ );
+ }
+
+ @Test
+ public void sendTooBigAuthMultiFrameMessage()
+ {
+ doTest((client) ->
+ {
+ AuthResponse authResponse = createAuthMessage("cassandra",
createIncorrectLongPassword(TOO_BIG_MULTI_FRAME_AUTH_MESSAGE_SIZE));
+ Assertions.assertThatThrownBy(() ->
client.execute(authResponse))
+ .hasCauseInstanceOf(ProtocolException.class)
+
.hasMessageContaining(CQLMessageHandler.MULTI_FRAME_AUTH_ERROR_MESSAGE_PREFIX);
+ Util.spinAssertEquals(false, () ->
client.connection.channel().isOpen(), 10);
+ }
+ );
+ }
+
+ private static String createIncorrectLongPassword(int length)
+ {
+ StringBuilder password = new StringBuilder(length);
+ for (int i = 0; i < length; i++)
+ password.append('a');
+ return password.toString();
+ }
+
+ private AuthResponse createAuthMessage(String username, String password)
+ {
+ PlainTextAuthProvider authProvider = new
PlainTextAuthProvider(username, password);
+ Authenticator authenticator = authProvider.newAuthenticator((EndPoint)
null, null);
+ return new AuthResponse(authenticator.initialResponse());
+ }
+}
diff --git
a/test/unit/org/apache/cassandra/transport/ClientResourceLimitsTest.java
b/test/unit/org/apache/cassandra/transport/ClientResourceLimitsTest.java
index 8e94997239..1407ab05ac 100644
--- a/test/unit/org/apache/cassandra/transport/ClientResourceLimitsTest.java
+++ b/test/unit/org/apache/cassandra/transport/ClientResourceLimitsTest.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.transport;
-import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
@@ -32,9 +31,6 @@ import org.apache.cassandra.service.StorageService;
import org.junit.*;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.cql3.CQLTester;
-import org.apache.cassandra.cql3.QueryOptions;
-import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.db.virtual.*;
@@ -50,21 +46,11 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-public class ClientResourceLimitsTest extends CQLTester
+public class ClientResourceLimitsTest extends NativeProtocolLimitsTestBase
{
private static final long LOW_LIMIT = 600L;
private static final long HIGH_LIMIT = 5000000000L;
- private static final QueryOptions V5_DEFAULT_OPTIONS =
- QueryOptions.create(QueryOptions.DEFAULT.getConsistency(),
- QueryOptions.DEFAULT.getValues(),
- QueryOptions.DEFAULT.skipMetadata(),
- QueryOptions.DEFAULT.getPageSize(),
- QueryOptions.DEFAULT.getPagingState(),
- QueryOptions.DEFAULT.getSerialConsistency(),
- ProtocolVersion.V5,
- KEYSPACE);
-
@BeforeClass
public static void setUp()
{
@@ -89,54 +75,6 @@ public class ClientResourceLimitsTest extends CQLTester
ClientResourceLimits.setEndpointLimit(LOW_LIMIT);
}
- @After
- public void dropCreatedTable()
- {
- try
- {
- QueryProcessor.executeOnceInternal("DROP TABLE " + KEYSPACE +
".atable");
- }
- catch (Throwable t)
- {
- // ignore
- }
- }
-
- @SuppressWarnings("resource")
- private SimpleClient client(boolean throwOnOverload)
- {
- try
- {
- return SimpleClient.builder(nativeAddr.getHostAddress(),
nativePort)
- .protocolVersion(ProtocolVersion.V5)
- .useBeta()
- .build()
- .connect(false, throwOnOverload);
- }
- catch (IOException e)
- {
- throw new RuntimeException("Error initializing client", e);
- }
- }
-
- @SuppressWarnings({"resource", "SameParameterValue"})
- private SimpleClient client(boolean throwOnOverload, int
largeMessageThreshold)
- {
- try
- {
- return SimpleClient.builder(nativeAddr.getHostAddress(),
nativePort)
- .protocolVersion(ProtocolVersion.V5)
- .useBeta()
- .largeMessageThreshold(largeMessageThreshold)
- .build()
- .connect(false, throwOnOverload);
- }
- catch (IOException e)
- {
- throw new RuntimeException("Error initializing client", e);
- }
- }
-
@Test
public void testQueryExecutionWithThrowOnOverload()
{
@@ -153,10 +91,8 @@ public class ClientResourceLimitsTest extends CQLTester
{
try (SimpleClient client = client(throwOnOverload))
{
- QueryMessage queryMessage = new QueryMessage("CREATE TABLE atable
(pk int PRIMARY KEY, v text)",
- V5_DEFAULT_OPTIONS);
- client.execute(queryMessage);
- queryMessage = new QueryMessage("SELECT * FROM atable",
V5_DEFAULT_OPTIONS);
+ createTable(client);
+ QueryMessage queryMessage = new QueryMessage("SELECT * FROM
atable", queryOptions());
client.execute(queryMessage);
}
}
@@ -189,7 +125,7 @@ public class ClientResourceLimitsTest extends CQLTester
{
// The first query does not trigger backpressure/pause the
connection:
QueryMessage queryMessage =
- new QueryMessage("CREATE TABLE atable (pk int PRIMARY KEY,
v text)", V5_DEFAULT_OPTIONS);
+ new QueryMessage("CREATE TABLE atable (pk int PRIMARY KEY,
v text)", queryOptions());
Message.Response belowThresholdResponse =
client.execute(queryMessage);
assertEquals(0, getPausedConnectionsGauge().getValue().intValue());
assertNoWarningContains(belowThresholdResponse, "bytes in flight");
@@ -261,7 +197,11 @@ public class ClientResourceLimitsTest extends CQLTester
{
// Bump the per-endpoint limit to make sure we exhaust the global
ClientResourceLimits.setEndpointLimit(HIGH_LIMIT);
- testOverloadedException(() -> client(true, Ints.checkedCast(LOW_LIMIT
/ 2)));
+ // test message = 2/3 x
+ // emulated concurrent message = 2/3 x
+ // test message + emulated concurrent message = 4/3 x > x set as a
global limit
+ emulateInFlightConcurrentMessage(LOW_LIMIT * 2 / 3);
+ testOverloadedException(() -> client(true, Ints.checkedCast(LOW_LIMIT
/ 2)), LOW_LIMIT * 2 / 3);
}
@Test
@@ -269,18 +209,25 @@ public class ClientResourceLimitsTest extends CQLTester
{
// Make sure we can only exceed the per-endpoint limit
ClientResourceLimits.setGlobalLimit(HIGH_LIMIT);
- testOverloadedException(() -> client(true, Ints.checkedCast(LOW_LIMIT
/ 2)));
+ // test message = 2/3 x
+ // emulated concurrent message = 2/3 x
+ // test message + emulated concurrent message = 4/3 x > x set as an
endpoint limit
+ emulateInFlightConcurrentMessage(LOW_LIMIT * 2 / 3);
+ testOverloadedException(() -> client(true, Ints.checkedCast(LOW_LIMIT
/ 2)), LOW_LIMIT * 2 / 3);
}
private void testOverloadedException(Supplier<SimpleClient> clientSupplier)
+ {
+ testOverloadedException(clientSupplier, LOW_LIMIT * 2);
+ }
+
+ private void testOverloadedException(Supplier<SimpleClient>
clientSupplier, long limit)
{
try (SimpleClient client = clientSupplier.get())
{
- QueryMessage queryMessage = new QueryMessage("CREATE TABLE atable
(pk int PRIMARY KEY, v text)",
- V5_DEFAULT_OPTIONS);
- client.execute(queryMessage);
+ createTable(client);
- queryMessage = queryMessage();
+ QueryMessage queryMessage = queryMessage(limit);
try
{
client.execute(queryMessage);
@@ -295,11 +242,7 @@ public class ClientResourceLimitsTest extends CQLTester
private QueryMessage queryMessage()
{
- StringBuilder query = new StringBuilder("INSERT INTO atable (pk, v)
VALUES (1, '");
- for (int i=0; i < LOW_LIMIT * 2; i++)
- query.append('a');
- query.append("')");
- return new QueryMessage(query.toString(), V5_DEFAULT_OPTIONS);
+ return queryMessage(LOW_LIMIT * 2);
}
@Test
@@ -341,7 +284,7 @@ public class ClientResourceLimitsTest extends CQLTester
VirtualKeyspaceRegistry.instance.register(new
VirtualKeyspace(table, ImmutableList.of(vt1)));
final QueryMessage queryMessage = new
QueryMessage(String.format("SELECT * FROM %s.%s", table, table),
-
V5_DEFAULT_OPTIONS);
+ queryOptions());
try
{
Thread tester = new Thread(() -> client.execute(queryMessage));
@@ -367,8 +310,9 @@ public class ClientResourceLimitsTest extends CQLTester
try
{
QueryMessage smallMessage = new QueryMessage(String.format("CREATE
TABLE %s.atable (pk int PRIMARY KEY, v text)", KEYSPACE),
- V5_DEFAULT_OPTIONS);
+ queryOptions());
client.execute(smallMessage);
+ createTable(client);
try
{
client.execute(queryMessage());
diff --git a/test/unit/org/apache/cassandra/transport/MessageSizeLimitTest.java
b/test/unit/org/apache/cassandra/transport/MessageSizeLimitTest.java
new file mode 100644
index 0000000000..8b7b2999f4
--- /dev/null
+++ b/test/unit/org/apache/cassandra/transport/MessageSizeLimitTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.transport;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.net.FrameEncoder;
+import org.apache.cassandra.transport.messages.QueryMessage;
+import org.assertj.core.api.Assertions;
+
+public class MessageSizeLimitTest extends NativeProtocolLimitsTestBase
+{
+ private static final int MAX_CQL_MESSAGE_SIZE =
FrameEncoder.Payload.MAX_SIZE * 3;
+ private static final int TOO_BIG_MESSAGE_SIZE = MAX_CQL_MESSAGE_SIZE * 2;
+ private static final int NORMAL_MESSAGE_SIZE = MAX_CQL_MESSAGE_SIZE - 500;
+
+ @BeforeClass
+ public static void setUp()
+ {
+ DatabaseDescriptor.setNativeTransportReceiveQueueCapacityInBytes(1);
+
DatabaseDescriptor.setNativeTransportMaxRequestDataInFlightPerIpInBytes(MAX_CQL_MESSAGE_SIZE);
+
DatabaseDescriptor.setNativeTransportConcurrentRequestDataInFlightInBytes(MAX_CQL_MESSAGE_SIZE);
+
DatabaseDescriptor.setNativeTransportMaxMessageSizeInBytes(MAX_CQL_MESSAGE_SIZE);
+ requireNetwork();
+ }
+
+ @Before
+ public void setLimits()
+ {
+ ClientResourceLimits.setGlobalLimit(MAX_CQL_MESSAGE_SIZE);
+ ClientResourceLimits.setEndpointLimit(MAX_CQL_MESSAGE_SIZE);
+ }
+
+ @Test
+ public void sendMessageWithSizeMoreThanMaxMessageSize()
+ {
+ runClientLogic((client) ->
+ {
+ QueryMessage tooBigQueryMessage =
queryMessage(TOO_BIG_MESSAGE_SIZE);
+ Assertions.assertThatThrownBy(() ->
client.execute(tooBigQueryMessage))
+
.hasCauseInstanceOf(InvalidRequestException.class);
+ // InvalidRequestException: CQL Message of size 524362
bytes exceeds allowed maximum of 262144 bytes
+
+ // we send one more message to check that the server
continues to process new messages in the opened connection
+ QueryMessage queryMessage =
queryMessage(NORMAL_MESSAGE_SIZE);
+ client.execute(queryMessage);
+ }
+ );
+ }
+
+ @Test(timeout = 30_000)
+ public void checkThatThereIsNoStarvationForMultiFrameMessages() throws
InterruptedException
+ {
+ runClientLogic((client) -> {}, true); // to create table
+ AtomicInteger completedSuccessfully = new AtomicInteger(0);
+ int threadsCount = 2;
+ List<Thread> threads = new ArrayList<>();
+ for (int i = 0; i < threadsCount; i++)
+ {
+ threads.add(new Thread(() -> runClientLogic((client) -> {
+ sendMessages(client, 100, NORMAL_MESSAGE_SIZE);
+ completedSuccessfully.incrementAndGet();
+ }, false))
+ );
+ }
+ for (Thread thread : threads)
+ thread.start();
+
+ for (Thread thread : threads)
+ thread.join();
+
+ Assert.assertEquals("not all messages were sent successfully by all
threads",
+ threadsCount, completedSuccessfully.get());
+ }
+
+ private void sendMessages(SimpleClient client, int messagesCount, int
messageSize)
+ {
+ for (int i = 0; i < messagesCount; i++)
+ {
+ QueryMessage queryMessage1 = queryMessage(messageSize);
+ client.execute(queryMessage1);
+ }
+ }
+
+ @Test
+ public void sendMessageWithSizeBelowLimit()
+ {
+ runClientLogic((client) ->
+ {
+ QueryMessage queryMessage =
queryMessage(NORMAL_MESSAGE_SIZE);
+ client.execute(queryMessage);
+
+ // run one more time, to validate that the connection is
still alive
+ queryMessage = queryMessage(NORMAL_MESSAGE_SIZE);
+ client.execute(queryMessage);
+ }
+ );
+ }
+}
diff --git
a/test/unit/org/apache/cassandra/transport/NativeProtocolLimitsTestBase.java
b/test/unit/org/apache/cassandra/transport/NativeProtocolLimitsTestBase.java
new file mode 100644
index 0000000000..221bf97b23
--- /dev/null
+++ b/test/unit/org/apache/cassandra/transport/NativeProtocolLimitsTestBase.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.transport;
+
+import java.io.IOException;
+
+import org.junit.After;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.net.FrameEncoder;
+import org.apache.cassandra.transport.messages.QueryMessage;
+import org.apache.cassandra.utils.FBUtilities;
+
+public abstract class NativeProtocolLimitsTestBase extends CQLTester
+{
+ protected final ProtocolVersion version;
+
+ protected long emulatedUsedCapacity;
+
+ public NativeProtocolLimitsTestBase()
+ {
+ this(ProtocolVersion.V5);
+ }
+
+ public NativeProtocolLimitsTestBase(ProtocolVersion version)
+ {
+ this.version = version;
+ }
+
+ @After
+ public void dropCreatedTable()
+ {
+ if (emulatedUsedCapacity > 0)
+ {
+ releaseEmulatedCapacity(emulatedUsedCapacity);
+ }
+ try
+ {
+ QueryProcessor.executeOnceInternal("DROP TABLE " + KEYSPACE +
".atable");
+ }
+ catch (Throwable t)
+ {
+ // ignore
+ }
+ }
+
+ public QueryOptions queryOptions()
+ {
+ return QueryOptions.create(QueryOptions.DEFAULT.getConsistency(),
+ QueryOptions.DEFAULT.getValues(),
+ QueryOptions.DEFAULT.skipMetadata(),
+ QueryOptions.DEFAULT.getPageSize(),
+ QueryOptions.DEFAULT.getPagingState(),
+ QueryOptions.DEFAULT.getSerialConsistency(),
+ version,
+ KEYSPACE);
+ }
+
+ public SimpleClient client()
+ {
+ return client(false);
+ }
+
+ @SuppressWarnings({"resource", "SameParameterValue"})
+ public SimpleClient client(boolean throwOnOverload)
+ {
+ return client(throwOnOverload, FrameEncoder.Payload.MAX_SIZE);
+ }
+
+ @SuppressWarnings({"resource", "SameParameterValue"})
+ public SimpleClient client(boolean throwOnOverload, int
largeMessageThreshold)
+ {
+ try
+ {
+ return SimpleClient.builder(nativeAddr.getHostAddress(),
nativePort)
+ .protocolVersion(version)
+ .useBeta()
+ .largeMessageThreshold(largeMessageThreshold)
+ .build()
+ .connect(false, throwOnOverload);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException("Error initializing client", e);
+ }
+ }
+
+ public void runClientLogic(ClientLogic clientLogic)
+ {
+ runClientLogic(clientLogic, true);
+ }
+
+ public void runClientLogic(ClientLogic clientLogic, boolean createTable)
+ {
+ try (SimpleClient client = client())
+ {
+ if (createTable)
+ createTable(client);
+ clientLogic.run(client);
+ }
+ }
+
+ public void createTable(SimpleClient client)
+ {
+ QueryMessage queryMessage = new QueryMessage("CREATE TABLE IF NOT
EXISTS " +
+ KEYSPACE + ".atable (pk
int PRIMARY KEY, v text)",
+ queryOptions());
+ client.execute(queryMessage);
+ }
+
+ public void doTest(ClientLogic testLogic)
+ {
+ try (SimpleClient client = client())
+ {
+ testLogic.run(client);
+ }
+ }
+ public interface ClientLogic
+ {
+ void run(SimpleClient simpleClient);
+ }
+
+ public QueryMessage queryMessage(long valueSize)
+ {
+ StringBuilder query = new StringBuilder("INSERT INTO " + KEYSPACE +
".atable (pk, v) VALUES (1, '");
+ for (int i = 0; i < valueSize; i++)
+ query.append('a');
+ query.append("')");
+ return new QueryMessage(query.toString(), queryOptions());
+ }
+
+ protected void emulateInFlightConcurrentMessage(long length)
+ {
+ ClientResourceLimits.Allocator allocator =
ClientResourceLimits.getAllocatorForEndpoint(FBUtilities.getJustLocalAddress());
+ ClientResourceLimits.ResourceProvider resourceProvider = new
ClientResourceLimits.ResourceProvider.Default(allocator);
+ resourceProvider.globalLimit().allocate(length);
+ resourceProvider.endpointLimit().allocate(length);
+ emulatedUsedCapacity += length;
+ }
+
+ protected void releaseEmulatedCapacity(long length)
+ {
+ ClientResourceLimits.Allocator allocator =
ClientResourceLimits.getAllocatorForEndpoint(FBUtilities.getJustLocalAddress());
+ ClientResourceLimits.ResourceProvider resourceProvider = new
ClientResourceLimits.ResourceProvider.Default(allocator);
+ resourceProvider.globalLimit().release(length);
+ resourceProvider.endpointLimit().release(length);
+ }
+}
diff --git a/test/unit/org/apache/cassandra/transport/RateLimitingTest.java
b/test/unit/org/apache/cassandra/transport/RateLimitingTest.java
index 0b3fb34a79..1e8e16a662 100644
--- a/test/unit/org/apache/cassandra/transport/RateLimitingTest.java
+++ b/test/unit/org/apache/cassandra/transport/RateLimitingTest.java
@@ -28,7 +28,9 @@ import java.util.stream.Collectors;
import com.codahale.metrics.Meter;
import com.google.common.base.Ticker;
+
import org.awaitility.Awaitility;
+
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -36,12 +38,9 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.cql3.CQLTester;
-import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.exceptions.OverloadedException;
import org.apache.cassandra.metrics.CassandraMetricsRegistry;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.transport.messages.QueryMessage;
import org.apache.cassandra.utils.Throwables;
import static org.junit.Assert.assertEquals;
@@ -54,7 +53,7 @@ import static
org.apache.cassandra.transport.ProtocolVersion.V4;
@SuppressWarnings("UnstableApiUsage")
@RunWith(Parameterized.class)
-public class RateLimitingTest extends CQLTester
+public class RateLimitingTest extends NativeProtocolLimitsTestBase
{
public static final String BACKPRESSURE_WARNING_SNIPPET = "Request
breached global limit";
@@ -63,9 +62,6 @@ public class RateLimitingTest extends CQLTester
private static final long MAX_LONG_CONFIG_VALUE = Long.MAX_VALUE - 1;
- @Parameterized.Parameter
- public ProtocolVersion version;
-
@Parameterized.Parameters(name="{0}")
public static Collection<Object[]> versions()
{
@@ -74,6 +70,11 @@ public class RateLimitingTest extends CQLTester
.collect(Collectors.toList());
}
+ public RateLimitingTest(ProtocolVersion version)
+ {
+ super(version);
+ }
+
private AtomicLong tick;
private Ticker ticker;
@@ -105,6 +106,7 @@ public class RateLimitingTest extends CQLTester
ClientResourceLimits.setGlobalLimit(MAX_LONG_CONFIG_VALUE);
}
+
@Test
public void shouldThrowOnOverloadSmallMessages() throws Exception
{
@@ -147,15 +149,19 @@ public class RateLimitingTest extends CQLTester
private void testBytesInFlightOverload(int payloadSize) throws Exception
{
- try (SimpleClient client = client().connect(false, true))
+ int emulatedConcurrentMessageSize = payloadSize * 3 / 2;
+ try (SimpleClient client = client(true, LARGE_PAYLOAD_THRESHOLD_BYTES))
{
StorageService.instance.setNativeTransportRateLimitingEnabled(false);
- QueryMessage queryMessage = new QueryMessage("CREATE TABLE IF NOT
EXISTS " + KEYSPACE + ".atable (pk int PRIMARY KEY, v text)", queryOptions());
- client.execute(queryMessage);
+ createTable(client);
StorageService.instance.setNativeTransportRateLimitingEnabled(true);
ClientResourceLimits.GLOBAL_REQUEST_LIMITER.setRate(OVERLOAD_PERMITS_PER_SECOND,
ticker);
- ClientResourceLimits.setGlobalLimit(1);
+ // test message = 1x
+ // emulated concurrent message = 1.5x
+ // test message + emulated concurrent message = 2.5x > 2x set as a
global limit
+ ClientResourceLimits.setGlobalLimit(payloadSize * 2);
+ emulateInFlightConcurrentMessage(emulatedConcurrentMessageSize);
try
{
@@ -170,18 +176,17 @@ public class RateLimitingTest extends CQLTester
finally
{
// Sanity check bytes in flight limiter.
- Awaitility.await().untilAsserted(() -> assertEquals(0,
ClientResourceLimits.getCurrentGlobalUsage()));
+ Awaitility.await().untilAsserted(() ->
assertEquals(emulatedConcurrentMessageSize,
ClientResourceLimits.getCurrentGlobalUsage()));
StorageService.instance.setNativeTransportRateLimitingEnabled(false);
}
}
private void testOverload(int payloadSize, boolean throwOnOverload) throws
Exception
{
- try (SimpleClient client = client().connect(false, throwOnOverload))
+ try (SimpleClient client = client(throwOnOverload,
LARGE_PAYLOAD_THRESHOLD_BYTES))
{
StorageService.instance.setNativeTransportRateLimitingEnabled(false);
- QueryMessage queryMessage = new QueryMessage("CREATE TABLE IF NOT
EXISTS " + KEYSPACE + ".atable (pk int PRIMARY KEY, v text)", queryOptions());
- client.execute(queryMessage);
+ createTable(client);
StorageService.instance.setNativeTransportRateLimitingEnabled(true);
ClientResourceLimits.GLOBAL_REQUEST_LIMITER.setRate(OVERLOAD_PERMITS_PER_SECOND,
ticker);
@@ -286,40 +291,6 @@ public class RateLimitingTest extends CQLTester
assertEquals(dispatchedPrior + 2,
getRequestDispatchedMeter().getCount());
}
- private QueryMessage queryMessage(int length)
- {
- StringBuilder query = new StringBuilder("INSERT INTO " + KEYSPACE +
".atable (pk, v) VALUES (1, '");
-
- for (int i = 0; i < length; i++)
- {
- query.append('a');
- }
-
- query.append("')");
- return new QueryMessage(query.toString(), queryOptions());
- }
-
- private SimpleClient client()
- {
- return SimpleClient.builder(nativeAddr.getHostAddress(), nativePort)
- .protocolVersion(version)
- .useBeta()
-
.largeMessageThreshold(LARGE_PAYLOAD_THRESHOLD_BYTES)
- .build();
- }
-
- private QueryOptions queryOptions()
- {
- return QueryOptions.create(QueryOptions.DEFAULT.getConsistency(),
- QueryOptions.DEFAULT.getValues(),
- QueryOptions.DEFAULT.skipMetadata(),
- QueryOptions.DEFAULT.getPageSize(),
- QueryOptions.DEFAULT.getPagingState(),
- QueryOptions.DEFAULT.getSerialConsistency(),
- version,
- KEYSPACE);
- }
-
protected static Meter getRequestDispatchedMeter()
{
String metricName =
"org.apache.cassandra.metrics.Client.RequestDispatched";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]