[ 
https://issues.apache.org/jira/browse/CASSANDRA-20052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17896722#comment-17896722
 ] 

Dmitry Konstantinov edited comment on CASSANDRA-20052 at 11/8/24 4:35 PM:
--------------------------------------------------------------------------

{quote}I don't quite see why though, it's exactly the same as the 
throwOnOverload case. We know by reading the first frame that we are overloaded 
so we decide we won't process the message and set a flag to that effect right 
away (we should just release  the subsequent frames as we read them). How is 
this different? I'm just saying we should do that regardless of throwOnOverload.
{quote}
I meant the legacy logic which we had before frames concepts introduction. 
Currently we have them and the frames are read and processed in 2 steps:
1) we read frames from channel, parse them and accumulate them in 
org.apache.cassandra.net.FrameDecoder#frames to process later
2) when we process accumulated frames to extract messages from them 
(org.apache.cassandra.net.FrameDecoder#deliver)
So, at the moment when we parse the first frame as a part of a message 
extracting, you may have many frames read from the channel and stored in memory 
(limited by the amount of data Netty allows to read in one event loop round).

In any case it is not a blocker to implement that you are suggesting, it was 
just a side note.
{quote}After the {{acquireCapacity}} check, rate limiting is based on request 
rate/time in queue and not on message size, so I don't see what you mean here. 
If we receive a stream of large messages, we'll process them until we cannot 
acquire capacity and then we'll start rejecting them (assuming we implement 
what I said in the previous paragraphs). Rate limiting doesn't really come into 
it at this point
{quote}
{quote}I am mostly trying to understand whether there's a valid reason to 
introduce another configuration option specifically for max message size. If 
the server has capacity to process a message and any mutations in it are within 
the acceptable size boundaries, do we really need another setting in yaml?
{quote}
Let me use an example to explain my concerns. Let's assume the following 
configuration:
 * native_transport_max_request_data_in_flight = 256 MiB (for example, we have 
2560 MiB heap)
 * native_transport_receive_queue_capacity = 1MiB (default)
 * native_transport_max_request_data_in_flight_per_ip - for simplicity lets 
forget about this limit for a while
 * max_mutation_size = 32 MiB
 * +throwOnOverload = false+ (the default mode which I suppose almost everybody 
uses..)

1st case:
 * a client sent 1 large message with size = 512 MiB. It is bigger total rate 
limit/capacity (native_transport_max_request_data_in_flight + 
native_transport_receive_queue_capacity) or max_mutation_size.
 * expected behaviour: return to the client a error 
(InvalidRequestException/the message is too big)

2nd case:
 * a client sent 1 large message with size = 64 MiB. It is less than total rate 
limit but bigger than max_mutation_size.
 * expected behaviour: return to the client a error 
(InvalidRequestException/the message is too big)

3rd case:
 * a client (or several clients) sent 32 messages with size = 16 MiB. Our total 
capacity = native_transport_max_request_data_in_flight + 
native_transport_receive_queue_capacity = 257 MiB, so  we can acquire capacity 
for 16 messages (16 x 16 MiB) to consume the limit (1 MiB is remaining). For 
remaining 16 messages we want to pause processing and resume it when the 
in-flight messages will be processed and some acquire capacity will be 
released. We do not want to return an error for any of the 32 messages in this 
case.

So, for me the cases are different:
 * 1st and 2nd case: max message size limit - protection by rejecting early
 * 3rd case: rate limiting - by flow control/back-pressure

By implementing logic using the mentioned approach:
{code:java}
if (!acquireCapacity(header, endpointReserve, globalReserve) || messageSize > 
IMutation.MAX_MUTATION_SIZE)
          // read subsequent frames, but don't buffer them. 
          // When done, return error response (either OverloadedException 
          // or InvalidRequestException, depending on connection config). {code}
we will make rate limiting failing for 3rd case while the expectation I suppose 
for this case is not to do it.


was (Author: dnk):
{quote}I don't quite see why though, it's exactly the same as the 
throwOnOverload case. We know by reading the first frame that we are overloaded 
so we decide we won't process the message and set a flag to that effect right 
away (we should just release  the subsequent frames as we read them). How is 
this different? I'm just saying we should do that regardless of throwOnOverload.
{quote}
I meant the legacy logic which we had before frames concepts introduction. 
Currently we have them and the frames are read and processed in 2 steps:
1) we read frames from channel, parse them and accumulate them in 
org.apache.cassandra.net.FrameDecoder#frames to process later
2) when we process accumulated frames to extract messages from them 
(org.apache.cassandra.net.FrameDecoder#deliver)
So, at the moment when we parse the first frame as a part of a message 
extracting, you may have many frames read from the channel and stored in memory 
(limited by the amount of data Netty allows to read in one event loop round).

In any case it is not a blocker to implement that you are suggesting, it was 
just a side note.
{quote}After the {{acquireCapacity}} check, rate limiting is based on request 
rate/time in queue and not on message size, so I don't see what you mean here. 
If we receive a stream of large messages, we'll process them until we cannot 
acquire capacity and then we'll start rejecting them (assuming we implement 
what I said in the previous paragraphs). Rate limiting doesn't really come into 
it at this point
{quote}
{quote}I am mostly trying to understand whether there's a valid reason to 
introduce another configuration option specifically for max message size. If 
the server has capacity to process a message and any mutations in it are within 
the acceptable size boundaries, do we really need another setting in yaml?
{quote}
Let me use an example to explain my concerns. Let's assume the following 
configuration:
 * native_transport_max_request_data_in_flight = 256 MiB (for example, we have 
2560 MiB heap)
 * native_transport_receive_queue_capacity = 1MiB (default)
 * native_transport_max_request_data_in_flight_per_ip - for simplicity lets 
forget about this limit for a while
 * max_mutation_size = 32 MiB
 * +throwOnOverload = false+ (the default mode which I suppose almost everybody 
uses..)

1st case:
 * a client sent 1 large message with size = 512 MiB. It is bigger total rate 
limit/capacity (native_transport_max_request_data_in_flight + 
native_transport_receive_queue_capacity) or max_mutation_size.
 * expected behaviour: return the client a error (InvalidRequestException/the 
message is too big)

2nd case:
 * a client sent 1 large message with size = 64 MiB. It is less than total rate 
limit but bigger than max_mutation_size.
 * expected behaviour: return the client a error (InvalidRequestException/the 
message is too big)

3rd case:
 * a client (or several clients) sent 32 messages with size = 16 MiB. Our total 
capacity = native_transport_max_request_data_in_flight + 
native_transport_receive_queue_capacity = 257 MiB, so  we can acquire capacity 
for 16 messages (16 x 16 MiB) to consume the limit (1 MiB is remaining). For 
remaining 16 messages we want to pause processing and resume it when the 
in-flight messages will be processed and some acquire capacity will be 
released. We do not want to return an error for any of the 32 messages in this 
case.

So, for me the cases are different:
 * 1st and 2nd case: max message size limit - protection by rejecting early
 * 3rd case: rate limiting - by flow control/back-pressure

By implementing logic using the mentioned approach:
{code:java}
if (!acquireCapacity(header, endpointReserve, globalReserve) || messageSize > 
IMutation.MAX_MUTATION_SIZE)
          // read subsequent frames, but don't buffer them. 
          // When done, return error response (either OverloadedException 
          // or InvalidRequestException, depending on connection config). {code}
we will make rate limiting failing for 3rd case while the expectation I suppose 
for this case is not to do it.

> Size of CQL messages is not limited in V5 protocol logic
> --------------------------------------------------------
>
>                 Key: CASSANDRA-20052
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-20052
>             Project: Cassandra
>          Issue Type: Bug
>          Components: Messaging/Client
>            Reporter: Dmitry Konstantinov
>            Assignee: Dmitry Konstantinov
>            Priority: Normal
>         Attachments: cassandra_rate_limit.svg
>
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> Size of CQL messages is not limited in V5 protocol logic
>  - After introducing of v5 frames we do not have any CQL message limit 
> anymore, native_transport_max_frame_size_in_mb which had such limit in pre-V5 
> epoch is applicable now only to pre-V5 protocol sessions, otherwise it is 
> applied only to the initial STARTUP/OPTIONS messages handling, it is not 
> checked in any v5 logic. So, currently a v5 CQL message of any size can be 
> sent to Cassandra server.
>  - The overload logic just allows to process huge messages for free to avoid 
> starvation, so it does not provide any protection against the most dangerous 
> requests from a memory pressure point of view.
>  - The situation even more dangerous: the v5 framing logic is enabled just 
> after AUTH response, so we do not limit message size even for AUTH_RESPONSE 
> messages from a client. It can be used as a DoS attack: a non-authenticated 
> client can send a huge username/password to Cassandra server to cause 
> troubles with GC or even kill it.
> An easy example:
> {code:java}
> public class TestBigAuthRequest {
>     public static void main(String[] args) {
>         String password = getString(500_000_000, '-');
>         try (CqlSession session = CqlSession.builder()
>                 .addContactEndPoint(new DefaultEndPoint(new 
> InetSocketAddress("localhost", 9042)))
>                 .withAuthCredentials("cassandra", password)
>                 .withLocalDatacenter("datacenter1")
>                 .build()) {
>             session.execute("select * from system.local");
>         }
>     }
>     private static String getString(int length, char charToFill) {
>         if (length > 0) {
>             char[] array = new char[length];
>             Arrays.fill(array, charToFill);
>             return new String(array);
>         }
>         return "";
>     }
> }
> {code}
> A thread stack of such invocation (captured to show the execution flow):
> {code:java}
> "nioEventLoopGroup-5-21@9164" prio=10 tid=0x86 nid=NA runnable
>   java.lang.Thread.State: RUNNABLE
>         at 
> org.apache.cassandra.transport.messages.AuthResponse$1.decode(AuthResponse.java:45)
>         at 
> org.apache.cassandra.transport.messages.AuthResponse$1.decode(AuthResponse.java:39)
>         at 
> org.apache.cassandra.transport.Message$Decoder.decodeMessage(Message.java:432)
>         at 
> org.apache.cassandra.transport.Message$Decoder$RequestDecoder.decode(Message.java:467)
>         at 
> org.apache.cassandra.transport.Message$Decoder$RequestDecoder.decode(Message.java:459)
>         at 
> org.apache.cassandra.transport.CQLMessageHandler.processRequest(CQLMessageHandler.java:377)
>         at 
> org.apache.cassandra.transport.CQLMessageHandler$LargeMessage.onComplete(CQLMessageHandler.java:755)
>         at 
> org.apache.cassandra.net.AbstractMessageHandler$LargeMessage.supply(AbstractMessageHandler.java:561)
>         at 
> org.apache.cassandra.net.AbstractMessageHandler.processSubsequentFrameOfLargeMessage(AbstractMessageHandler.java:257)
>         at 
> org.apache.cassandra.net.AbstractMessageHandler.processIntactFrame(AbstractMessageHandler.java:229)
>         at 
> org.apache.cassandra.net.AbstractMessageHandler.process(AbstractMessageHandler.java:216)
>         at 
> org.apache.cassandra.transport.CQLMessageHandler.process(CQLMessageHandler.java:147)
>         at 
> org.apache.cassandra.net.FrameDecoder.deliver(FrameDecoder.java:330)
>         at 
> org.apache.cassandra.net.FrameDecoder.channelRead(FrameDecoder.java:294)
>         at 
> org.apache.cassandra.net.FrameDecoder.channelRead(FrameDecoder.java:277)
>         at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>         at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>         at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>         at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
>         at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>         at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>         at 
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
>         at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
>         at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
>         at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
>         at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
>         at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
>         at 
> io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>         at 
> io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>         at 
> io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
>         at java.lang.Thread.run(Thread.java:829)
> {code}
> The provided MR (https://github.com/apache/cassandra/pull/3655) contains a 
> fix for the issue which introduces 2 new parameters:
> native_transport_max_message_size - to limit any CQL message size
> native_transport_max_auth_message_size (default = 128KiB) - to limit auth 
> response message size more strictly and add an extra protection against a 
> possible DoS attack.
> Design questions:
>  * The current implementation closes a CQL connection if a message is bigger 
> than the limits. A skip message body logic can be implemented to continue the 
> connection usage but it is more complicated and error prone.
>  * The tricky question is the default value for 
> native_transport_max_message_size,
> from one side - we want to have it not more than 
> min(native_transport_max_request_data_in_flight_per_ip, 
> native_transport_max_request_data_in_flight) to reduce chances to invoke the 
> branch of logic when a error handling does not work
> from another size - min(native_transport_max_request_data_in_flight_per_ip, 
> native_transport_max_request_data_in_flight) can be too small and there is a 
> chance to break a backward compatibility for existing deployments where 
> people use large messages and small heaps (while it is not a good idea).
> Related observations:
> 1) https://issues.apache.org/jira/browse/CASSANDRA-16886 - Reduce 
> native_transport_max_frame_size_in_mb (from 256M to 16M)
> 2) A correspondent logic for Cassandra server internode protocol a message 
> limit exists and rate limiting parameters are validated to be smaller than a 
> single message max size:
> internode_max_message_size = 
> min(internode_application_receive_queue_reserve_endpoint_capacity, 
> internode_application_send_queue_reserve_endpoint_capacity)
> internode_application_receive_queue_reserve_endpoint_capacity = 128MiB
> internode_application_send_queue_reserve_endpoint_capacity = 128MiB
> internode_max_message_size <= 
> internode_application_receive_queue_reserve_endpoint_capacity
> internode_max_message_size <= 
> internode_application_receive_queue_reserve_global_capacity
> internode_max_message_size <= 
> internode_application_send_queue_reserve_endpoint_capacity
> internode_max_message_size <= 
> internode_application_send_queue_reserve_global_capacity
> 3) Request types according to CQL specification:
> 4.1.1. STARTUP, in normal cases should be small
> 4.1.2. AUTH_RESPONSE, in normal cases should be small
> 4.1.3. OPTIONS, in normal cases should be small
> 4.1.4. QUERY, in normal cases should be small
> 4.1.5. PREPARE, in normal cases should be small
> 4.1.6. EXECUTE <-- potentially large in case of inserts, max_mutation_size = 
> commitlog_segment_size / 2; where commitlog_segment_size_in_mb = 32MiB
> 4.1.7. BATCH <-- potentially large, max_mutation_size = 
> commitlog_segment_size / 2; where commitlog_segment_size_in_mb = 32MiB
> 4.1.8. REGISTER, in normal cases should be small



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to