[ 
https://issues.apache.org/jira/browse/CASSANDRA-13630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16121770#comment-16121770
 ] 

Jason Brown commented on CASSANDRA-13630:
-----------------------------------------

The core idea here is that if the outgoing message is large/huge, we don't want 
to naively allocate a huge buffer just for serialization. For example, if it's 
a large mutation (say 16MB), we don't want to allocate 16MB * n number of 
replica buffers on the coordinator. A safer approach is to allocate standard 
sized buffers (currently 64k), serialize into them via {{DataOutputPlus}} 
interface, write each buffer to the netty channel when the buffer is full, and 
allocate another buffer for further serialization.

The outbound side which splits up serialization into multiple buffers is 
implemented in {{MessageOutHandler.ByteBufDataOutputStreamPlus}}. At the same 
time, I've made it so that all messages are written into a shared buffer (via 
{{MessageOutHandler.ByteBufDataOutputStreamPlus}}), whether it's a large 
message being chunked across multiple buffers, or multiple small messages being 
aggregated into one buffer (think mutations ACKs). This upside here is that we 
don't need to go to the netty allocator for each individual small message, and 
thus just send the single, 'aggregation' buffer downstream in the channel when 
we need to flush.

As I implemented this behavior, I discovered that the 'aggregating buffer' 
could be a problem wrt {{MessageOutHandler#channelWritabilityChanged}} as that 
method, when it gets the signal the channel is writable, attempts to drain any 
backlog from {{OutboundMessagingConnection}} (via the 
{{MessageOutHandler#backlogSupplier}}). If i had retained the current code it 
is quite likely that I would start to serialize a backlogged message while in 
the middle of a message already being serialized (from 
{{MessageOutHandler#write}}), which happened to fill the buffer and write it to 
the channel.

Further, I noticed I needed to forward-port more of CASSANDRA-13265 in order to 
handle expiring messages from the backlog. (FTR, 
{{MessageOutHandler#userEventTriggered}} handles closing the channel when we 
make no progress, but there's no other purging or removing items from the 
backlog queue. Closing the channel will fail any messages in the channel, but 
not from the backlog). Thus, I added the backlog-expiring behavior to 
{{OutboundMessagingConnection#expireMessages}}, and now drain messages from the 
backlog in {{MessageOutHandler#write}}. By trying to send the backlogged 
messages before the incoming message on the channel, it gives us a better shot 
at ordering the sending of the messages wrt the order in which they came into 
the {{OutboundMessagingConnection}}.

I updated jctools to 2.0.2. Instead of using a {{LinkedBlockingQueue}} in 
{{OutboundMessagingConnection}} for the backlog, I decided to use something 
without locks from jctools. Even though the queue still needs to be an 
unbounded multi-producer/multi-consumer (at least, to replicate existing 
behaviors), the jctools queue should be a bit more efficient than an LBQ.

Fixing the outbound size is only half of the problem, as we don't want to 
naively allocate a huge buffer on the receiving node, either. This is a bit 
trickier due to the blocking IO style of our deserializers. Thus, similar to 
what I've done in CASSANDRA-12229, I need to add incoming {{ByteBuf}}s to a 
{{RebufferingByteBufDataInputPlus}} and spin up a background thread for 
performing the deserialization. Since we only need to spin up the the thread 
when we have large message payloads, this will only happen in a minority of use 
cases:

- we are actually transmitting a message larger than 
{{OutboundMessagingPool#LARGE_MESSAGE_THRESHOLD}}, which defaults to 64k. At 
that point we're sending all of those over the outbound large message queue 
anyway, so all messages on that channel/socket will be over the threshold and 
require the background deserialization. So this won't apply to the small 
messages channel, where we can still handle all those messages in-line on the 
inbound netty event loop.
- If you are operating a huge sized cluster (I'm guessing at least 500 nodes in 
size, haven't done the math, tbh), large gossip messages might trigger the 
receiving gossip channel to switch to the background deserialization mode, 
especially ACK/ACK2 messages after a bounce as they will contain all the 
{{ApplicationState}}s for all the peers in the cluster. I do not think this 
will be a problem in practice.

I want to add more comments/documentation before committing, but that should 
not hold up a review. Also, this code is based on the current CASSANDRA-12229. 
Currently failing tests for this branch seem to be race conditions only in the 
streaming code, so I'll fix on the CASSANDRA-12229 branch.

> support large internode messages with netty
> -------------------------------------------
>
>                 Key: CASSANDRA-13630
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-13630
>             Project: Cassandra
>          Issue Type: Task
>          Components: Streaming and Messaging
>            Reporter: Jason Brown
>            Assignee: Jason Brown
>             Fix For: 4.0
>
>
> As part of CASSANDRA-8457, we decided to punt on large mesages to reduce the 
> scope of that ticket. However, we still need that functionality to ship a 
> correctly operating internode messaging subsystem.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to