[
https://issues.apache.org/jira/browse/CASSANDRA-20052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17896467#comment-17896467
]
Dmitry Konstantinov edited comment on CASSANDRA-20052 at 11/7/24 8:44 PM:
--------------------------------------------------------------------------
Hi [~samt]
Thank you for your answer!
Please find my comments below.
note: In the comments I refer to internode messaging logic several times for
the following reason: client messaging and internode messaging have a common
part (AbstractMessageHandler); internode messaging has a max message size check
already, so for consistency I was trying to use the same approach and
configuration style for client messaging as we have in internode messaging.
----
{quote}it seems to me that the existing mechanisms should almost be enough to
guard against it, I think it's mainly that there are bugs in the implementation.
{quote}
I would slightly differentiate the cases:
1) rate limiting - we want to apply a back-pressure to a message flow, ideally
in a transparent way (without errors) for application developers. Even in case
of throwOnOverload = false the request it also expected as valid and retryable
(normally a driver/client expects to do a retry later).
2) max message size limit - we want to protect our server from an incorrect,
potentially dangerous request and such request will not be consider later as
valid in case of a retry. Here we want to fail and fail as early as possible to
reduce amount of resources we spend for such requests.
So, yes, some existing code infrastructure definitely can and should be re-used
here but I am not sure if considering max message size limit as a special case
of existing rate limiting is the best way, especially taking in account that
the current rate limiting logic (throwOnOverload = false) is more a
post-processing action while the message size limit is a pre-processing action.
----
{quote}The core of the problem is the behaviour with large messages where we
always try to consume the entire message to avoid blocking the client. In
processFirstFrameOfLargeMessage, regardless of the throwOnOverload setting, if
we cannot acquire capacity to process the entire message, we buffer the initial
frame by calling largeMessage.supply and return true to indicate that the
pipeline should continue to read bytes from netty buffers. Subsequent frames of
the message are then read and also buffered without any further checking, all
of which seems obviously incorrect.
{quote}
Yes, I agree. Here is a diagram which I reverse-engineered from the source code
some time ago. I am focusing on throwOnOverload = false branch because it is
the default and mostly commonly used mode (if I remember correctly DataStax
Java driver even does not support properly throwOnOverload = true mode but it
can be out of date memories).
[^cassandra_rate_limit.svg]
The tricky thing with the current rate limiting logic is that we read frames to
memory first and only then we decide if we need to apply a back-pressure (so,
it is applied to the next messages not the current one). As we have the current
CQL message already loaded into memory the best that the current logic can do
is to continue to process it. It works ok for small messages (the impact of
passing through a small message is neglectable) but it can be an issue for
large ones. Ideally we should read a message length first and then decide - do
we have a capacity to process it but introducing frames makes it quite
complicated to implement, so we read frames first and then decide what to do.
As I see in the commit history:
[https://github.com/beobal/cassandra/commit/0aae5b4921a3ce6c21a8a0e977624c877b19cd5b]
originally we tried to pause the connection even for large messages but it
causes starvation and the logic was changed to avoid it by sacrificing rate
limiting logic itself.
Recently "Could not aquire capacity while processing native protocol message"
error message has been added to this problematic branch as a part of
CASSANDRA-19534 but the logic itself is the same.
Introducing a message limit should help to reduce the probability of this
branch invocation but I think it will not help to avoid it completely without
extra changes in rate limiting logic itself (because we may hit the same case
when we receive several large enough messages each one smaller than a limit).
Probably, if we have already checked and the message limit <
min(native_transport_max_request_data_in_flight_per_ip,
native_transport_max_request_data_in_flight) then we can stop to afraid the
starvation and can start using pause connections again.
----
{quote}If we fail to acquire capacity for the entire message when processing
the first frame, perhaps it would be better to mark the LargeMessage to
indicate this and have it not do the buffering. Essentially, always throw an
overloaded exception in this case, but also to override supply in
CQLMessageHandler.LargeMessage so we can continue to consume from the netty
buffers but don't call onIntactFrame if marked in this way.
{quote}
{quote}We could incorporate this protection by treating this in the same way as
a failure to acquire capacity, i.e. consume the remaining frames of the large
message but drop them and return an error when done.
{quote}
Yes, I was thinking about this "skipping large message" approach (it is similar
to the legacy handler logic -
org.apache.cassandra.transport.Envelope.Decoder#decode).
I am not sure if throwing overloaded exception exception is good response for
message max size check - this exception/error response to a client is assumed
as retryable while the message limit is a permanent failure. Probably we need
to use here InvalidRequestException as we do in the legacy handler logic..
I have not selected initially "skipping large message" way due to the following
reasons:
1) it is more complicated and correspondently more error prone
2) there is no such logic in internode messaging code for the same use case
At the same time I see an extra benefit for the skip logic: we will be able to
complete and not to drop other normal in-fligh requests received from the same
channel before if we do not close the channel after receiving of a too large
message as I am doing now.
I will try to update my MR to add the skip logic.
----
{quote}CASSANDRA-16886 makes the valid point that it is beneficial to filter
out mutations which can never be applied because they are too large for the
commit log as early as possible.
{quote}
Yes, I agree, this is why I mentioned this ticket and related max_mutation_size
parameter here.
{quote}This could be a check of the total message size from the header against
Envelope.Decoder.MAX_TOTAL_LENGTH (which is taken directly from
DatabaseDescriptor::getNativeTransportMaxFrameSize) or even against
IMutation.MAX_MUTATION_SIZE.
{quote}
I suppose an explicit parameter would be more useful:
1) it is less confusing: DatabaseDescriptor::getNativeTransportMaxFrameSize can
be mixed up by users with V5 frame size due to a similar name.
2) it will give more flexibility for users if they want to tune the parameter
Also, I think having a separate stricter limit for authentication messages is
better from security point of view.
At the same time I agree that it makes sense to consider
IMutation.MAX_MUTATION_SIZE for a default value.
Currently I use the rate limit threshold
native_transport_max_request_data_in_flight_per_ip as a default value for the
following reasons:
1) To reduce the probability to get into that problematic branch of rate
limiting logic
2) To align the logic with internode messaging which does the same
Probably, the default value for this property should be like:
min(max_mutation_size, native_transport_max_request_data_in_flight_per_ip,
native_transport_max_request_data_in_flight).
----
{quote}For small messages I don't think the same problem exists given the
128KiB hard limit on the size of a frame payload, so I'm not sure any changes
to processOneContainedMessage should be necessary.
{quote}
I have the following thoughts in my mind when I added this check into
processOneContainedMessage logic:
1) while normal CQL message check is definitely is not needed here - it maybe
good to still check auth (AUTH_RESPONSE) message size here if somebody will set
the limit for it low enough to get additional protection (less than a max frame
size = 128KiB).
2) code symmetry to keep message processing for large and small messages as
close as possible, similar logic in internode messaging actually does same kind
of message size check in both branches.
But I agree that it is not so needed and I am going to remove it to cause less
questions.
----
So, in total I am planning to apply the following changes in the MR
([https://github.com/apache/cassandra/pull/3655):]
1) remove size check in processOneContainedMessage
2) add "skipping large message" logic instead of throwing a fatal error to
close the channel
3) set default value for native_transport_max_message_size as
min(max_mutation_size, native_transport_max_request_data_in_flight_per_ip,
native_transport_max_request_data_in_flight)
Please give me know what do you think about it?
was (Author: dnk):
Hi [~samt]
Thank you for our answer!
Please find my comments below.
note: In the comments I refer to internode messaging logic several times for
the following reason: client messaging and internode messaging have a common
part (AbstractMessageHandler); internode messaging has a max message size check
already, so for consistency I was trying to use the same approach and
configuration style for client messaging as we have in internode messaging.
----
{quote}it seems to me that the existing mechanisms should almost be enough to
guard against it, I think it's mainly that there are bugs in the implementation.
{quote}
I would slightly differentiate the cases:
1) rate limiting - we want to apply a back-pressure to a message flow, ideally
in a transparent way (without errors) for application developers. Even in case
of throwOnOverload = false the request it also expected as valid and retryable
(normally a driver/client expects to do a retry later).
2) max message size limit - we want to protect our server from an incorrect,
potentially dangerous request and such request will not be consider later as
valid in case of a retry. Here we want to fail and fail as early as possible to
reduce amount of resources we spend for such requests.
So, yes, some existing code infrastructure definitely can and should be re-used
here but I am not sure if considering max message size limit as a special case
of existing rate limiting is the best way, especially taking in account that
the current rate limiting logic (throwOnOverload = false) is more a
post-processing action while the message size limit is a pre-processing action.
----
{quote}The core of the problem is the behaviour with large messages where we
always try to consume the entire message to avoid blocking the client. In
processFirstFrameOfLargeMessage, regardless of the throwOnOverload setting, if
we cannot acquire capacity to process the entire message, we buffer the initial
frame by calling largeMessage.supply and return true to indicate that the
pipeline should continue to read bytes from netty buffers. Subsequent frames of
the message are then read and also buffered without any further checking, all
of which seems obviously incorrect.
{quote}
Yes, I agree. Here is a diagram which I reverse-engineered from the source code
some time ago. I am focusing on throwOnOverload = false branch because it is
the default and mostly commonly used mode (if I remember correctly DataStax
Java driver even does not support properly throwOnOverload = true mode but it
can be out of date memories).
[^cassandra_rate_limit.svg]
The tricky thing with the current rate limiting logic is that we read frames to
memory first and only then we decide if we need to apply a back-pressure (so,
it is applied to the next messages not the current one). As we have the current
CQL message already loaded into memory the best that the current logic can do
is to continue to process it. It works ok for small messages (the impact of
passing through a small message is neglectable) but it can be an issue for
large ones. Ideally we should read a message length first and then decide - do
we have a capacity to process it but introducing frames makes it quite
complicated to implement, so we read frames first and then decide what to do.
As I see in the commit history:
[https://github.com/beobal/cassandra/commit/0aae5b4921a3ce6c21a8a0e977624c877b19cd5b]
originally we tried to pause the connection even for large messages but it
causes starvation and the logic was changed to avoid it by sacrificing rate
limiting logic itself.
Recently "Could not aquire capacity while processing native protocol message"
error message has been added to this problematic branch as a part of
CASSANDRA-19534 but the logic itself is the same.
Introducing a message limit should help to reduce the probability of this
branch invocation but I think it will not help to avoid it completely without
extra changes in rate limiting logic itself (because we may hit the same case
when we receive several large enough messages each one smaller than a limit).
Probably, if we have already checked and the message limit <
min(native_transport_max_request_data_in_flight_per_ip,
native_transport_max_request_data_in_flight) then we can stop to afraid the
starvation and can start using pause connections again.
----
{quote}If we fail to acquire capacity for the entire message when processing
the first frame, perhaps it would be better to mark the LargeMessage to
indicate this and have it not do the buffering. Essentially, always throw an
overloaded exception in this case, but also to override supply in
CQLMessageHandler.LargeMessage so we can continue to consume from the netty
buffers but don't call onIntactFrame if marked in this way.
{quote}
{quote}We could incorporate this protection by treating this in the same way as
a failure to acquire capacity, i.e. consume the remaining frames of the large
message but drop them and return an error when done.
{quote}
Yes, I was thinking about this "skipping large message" approach (it is similar
to the legacy handler logic -
org.apache.cassandra.transport.Envelope.Decoder#decode).
I am not sure if throwing overloaded exception exception is good response for
message max size check - this exception/error response to a client is assumed
as retryable while the message limit is a permanent failure. Probably we need
to use here InvalidRequestException as we do in the legacy handler logic..
I have not selected initially "skipping large message" way due to the following
reasons:
1) it is more complicated and correspondently more error prone
2) there is no such logic in internode messaging code for the same use case
At the same time I see an extra benefit for the skip logic: we will be able to
complete and not to drop other normal in-fligh requests received from the same
channel before if we do not close the channel after receiving of a too large
message as I am doing now.
I will try to update my MR to add the skip logic.
----
{quote}CASSANDRA-16886 makes the valid point that it is beneficial to filter
out mutations which can never be applied because they are too large for the
commit log as early as possible.
{quote}
Yes, I agree, this is why I mentioned this ticket and related max_mutation_size
parameter here.
{quote}This could be a check of the total message size from the header against
Envelope.Decoder.MAX_TOTAL_LENGTH (which is taken directly from
DatabaseDescriptor::getNativeTransportMaxFrameSize) or even against
IMutation.MAX_MUTATION_SIZE.
{quote}
I suppose an explicit parameter would be more useful:
1) it is less confusing: DatabaseDescriptor::getNativeTransportMaxFrameSize can
be mixed up by users with V5 frame size due to a similar name.
2) it will give more flexibility for users if they want to tune the parameter
Also, I think having a separate stricter limit for authentication messages is
better from security point of view.
At the same time I agree that it makes sense to consider
IMutation.MAX_MUTATION_SIZE for a default value.
Currently I use the rate limit threshold
native_transport_max_request_data_in_flight_per_ip as a default value for the
following reasons:
1) To reduce the probability to get into that problematic branch of rate
limiting logic
2) To align the logic with internode messaging which does the same
Probably, the default value for this property should be like:
min(max_mutation_size, native_transport_max_request_data_in_flight_per_ip,
native_transport_max_request_data_in_flight).
----
{quote}For small messages I don't think the same problem exists given the
128KiB hard limit on the size of a frame payload, so I'm not sure any changes
to processOneContainedMessage should be necessary.
{quote}
I have the following thoughts in my mind when I added this check into
processOneContainedMessage logic:
1) while normal CQL message check is definitely is not needed here - it maybe
good to still check auth (AUTH_RESPONSE) message size here if somebody will set
the limit for it low enough to get additional protection (less than a max frame
size = 128KiB).
2) code symmetry to keep message processing for large and small messages as
close as possible, similar logic in internode messaging actually does same kind
of message size check in both branches.
But I agree that it is not so needed and I am going to remove it to cause less
questions.
----
So, in total I am planning to apply the following changes in the MR
([https://github.com/apache/cassandra/pull/3655):]
1) remove size check in processOneContainedMessage
2) add "skipping large message" logic instead of throwing a fatal error to
close the channel
3) set default value for native_transport_max_message_size as
min(max_mutation_size, native_transport_max_request_data_in_flight_per_ip,
native_transport_max_request_data_in_flight)
Please give me know what do you think about it?
> Size of CQL messages is not limited in V5 protocol logic
> --------------------------------------------------------
>
> Key: CASSANDRA-20052
> URL: https://issues.apache.org/jira/browse/CASSANDRA-20052
> Project: Cassandra
> Issue Type: Bug
> Components: Messaging/Client
> Reporter: Dmitry Konstantinov
> Assignee: Dmitry Konstantinov
> Priority: Normal
> Attachments: cassandra_rate_limit.svg
>
> Time Spent: 10m
> Remaining Estimate: 0h
>
> Size of CQL messages is not limited in V5 protocol logic
> - After introducing of v5 frames we do not have any CQL message limit
> anymore, native_transport_max_frame_size_in_mb which had such limit in pre-V5
> epoch is applicable now only to pre-V5 protocol sessions, otherwise it is
> applied only to the initial STARTUP/OPTIONS messages handling, it is not
> checked in any v5 logic. So, currently a v5 CQL message of any size can be
> sent to Cassandra server.
> - The overload logic just allows to process huge messages for free to avoid
> starvation, so it does not provide any protection against the most dangerous
> requests from a memory pressure point of view.
> - The situation even more dangerous: the v5 framing logic is enabled just
> after AUTH response, so we do not limit message size even for AUTH_RESPONSE
> messages from a client. It can be used as a DoS attack: a non-authenticated
> client can send a huge username/password to Cassandra server to cause
> troubles with GC or even kill it.
> An easy example:
> {code:java}
> public class TestBigAuthRequest {
> public static void main(String[] args) {
> String password = getString(500_000_000, '-');
> try (CqlSession session = CqlSession.builder()
> .addContactEndPoint(new DefaultEndPoint(new
> InetSocketAddress("localhost", 9042)))
> .withAuthCredentials("cassandra", password)
> .withLocalDatacenter("datacenter1")
> .build()) {
> session.execute("select * from system.local");
> }
> }
> private static String getString(int length, char charToFill) {
> if (length > 0) {
> char[] array = new char[length];
> Arrays.fill(array, charToFill);
> return new String(array);
> }
> return "";
> }
> }
> {code}
> A thread stack of such invocation (captured to show the execution flow):
> {code:java}
> "nioEventLoopGroup-5-21@9164" prio=10 tid=0x86 nid=NA runnable
> java.lang.Thread.State: RUNNABLE
> at
> org.apache.cassandra.transport.messages.AuthResponse$1.decode(AuthResponse.java:45)
> at
> org.apache.cassandra.transport.messages.AuthResponse$1.decode(AuthResponse.java:39)
> at
> org.apache.cassandra.transport.Message$Decoder.decodeMessage(Message.java:432)
> at
> org.apache.cassandra.transport.Message$Decoder$RequestDecoder.decode(Message.java:467)
> at
> org.apache.cassandra.transport.Message$Decoder$RequestDecoder.decode(Message.java:459)
> at
> org.apache.cassandra.transport.CQLMessageHandler.processRequest(CQLMessageHandler.java:377)
> at
> org.apache.cassandra.transport.CQLMessageHandler$LargeMessage.onComplete(CQLMessageHandler.java:755)
> at
> org.apache.cassandra.net.AbstractMessageHandler$LargeMessage.supply(AbstractMessageHandler.java:561)
> at
> org.apache.cassandra.net.AbstractMessageHandler.processSubsequentFrameOfLargeMessage(AbstractMessageHandler.java:257)
> at
> org.apache.cassandra.net.AbstractMessageHandler.processIntactFrame(AbstractMessageHandler.java:229)
> at
> org.apache.cassandra.net.AbstractMessageHandler.process(AbstractMessageHandler.java:216)
> at
> org.apache.cassandra.transport.CQLMessageHandler.process(CQLMessageHandler.java:147)
> at
> org.apache.cassandra.net.FrameDecoder.deliver(FrameDecoder.java:330)
> at
> org.apache.cassandra.net.FrameDecoder.channelRead(FrameDecoder.java:294)
> at
> org.apache.cassandra.net.FrameDecoder.channelRead(FrameDecoder.java:277)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
> at
> io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
> at
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
> at
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
> at
> io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> at
> io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
> at java.lang.Thread.run(Thread.java:829)
> {code}
> The provided MR (https://github.com/apache/cassandra/pull/3655) contains a
> fix for the issue which introduces 2 new parameters:
> native_transport_max_message_size - to limit any CQL message size
> native_transport_max_auth_message_size (default = 128KiB) - to limit auth
> response message size more strictly and add an extra protection against a
> possible DoS attack.
> Design questions:
> * The current implementation closes a CQL connection if a message is bigger
> than the limits. A skip message body logic can be implemented to continue the
> connection usage but it is more complicated and error prone.
> * The tricky question is the default value for
> native_transport_max_message_size,
> from one side - we want to have it not more than
> min(native_transport_max_request_data_in_flight_per_ip,
> native_transport_max_request_data_in_flight) to reduce chances to invoke the
> branch of logic when a error handling does not work
> from another size - min(native_transport_max_request_data_in_flight_per_ip,
> native_transport_max_request_data_in_flight) can be too small and there is a
> chance to break a backward compatibility for existing deployments where
> people use large messages and small heaps (while it is not a good idea).
> Related observations:
> 1) https://issues.apache.org/jira/browse/CASSANDRA-16886 - Reduce
> native_transport_max_frame_size_in_mb (from 256M to 16M)
> 2) A correspondent logic for Cassandra server internode protocol a message
> limit exists and rate limiting parameters are validated to be smaller than a
> single message max size:
> internode_max_message_size =
> min(internode_application_receive_queue_reserve_endpoint_capacity,
> internode_application_send_queue_reserve_endpoint_capacity)
> internode_application_receive_queue_reserve_endpoint_capacity = 128MiB
> internode_application_send_queue_reserve_endpoint_capacity = 128MiB
> internode_max_message_size <=
> internode_application_receive_queue_reserve_endpoint_capacity
> internode_max_message_size <=
> internode_application_receive_queue_reserve_global_capacity
> internode_max_message_size <=
> internode_application_send_queue_reserve_endpoint_capacity
> internode_max_message_size <=
> internode_application_send_queue_reserve_global_capacity
> 3) Request types according to CQL specification:
> 4.1.1. STARTUP, in normal cases should be small
> 4.1.2. AUTH_RESPONSE, in normal cases should be small
> 4.1.3. OPTIONS, in normal cases should be small
> 4.1.4. QUERY, in normal cases should be small
> 4.1.5. PREPARE, in normal cases should be small
> 4.1.6. EXECUTE <-- potentially large in case of inserts, max_mutation_size =
> commitlog_segment_size / 2; where commitlog_segment_size_in_mb = 32MiB
> 4.1.7. BATCH <-- potentially large, max_mutation_size =
> commitlog_segment_size / 2; where commitlog_segment_size_in_mb = 32MiB
> 4.1.8. REGISTER, in normal cases should be small
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]