[ 
https://issues.apache.org/jira/browse/CASSANDRA-15066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16801278#comment-16801278
 ] 

Aleksey Yeschenko commented on CASSANDRA-15066:
-----------------------------------------------

h2. Inbound Path

We also fixed, and made improvements to the inbound messaging path.

First, there was a serious issue in mixed mode operation. The logic in the 
patch would rely on {{LRG}} flag set in the handshake to indicate that large 
messages are expected on that particular connection. Based on that flag an 
appropriate message handler would be picked - {{NonblockingBufferHandler}}, 
running on event loop, for small messages, or {{BlockingBufferHandler}}, 
running off event loop, for large messages. That flag, however, would never be 
set by 3.0 and 3.11 connections, and large messages from 3.0/3.11 nodes would 
end up blocking their event loops.

To fix that problem, we made sure that the single implementation of 
{{InboundMessageHandler}} could handle messages of any size effectively. We do 
this by deriving size of the incoming message from the byte stream, and based 
on that size either handle the message in-place, on the event loop, or to 
schedule a task ({{LargeCoprocessor}}) on an executor, off the event loop. In 
principle this also allows us to decouple the large message threshold for QOS 
purposes from the large message threshold for message processing purposes. For 
example, for QOS purposes you might want to send all messages over 64k over a 
separate connection, but to process all messages larger than 16k off event loop.

We also changed how off-eventloop processing is done. Previously, there would 
be a long-running companion thread that would keep processing messages so long 
as there were bytes to handle. After this patch, we’d schedule a single task on 
the shared executor (shared between inbound and outbound code), responsible for 
just one incoming message. We had to fix racy scheduling logic of the original 
patch to make it happen, but in the end it allowed to safely handle large 
message deser failures, and to allow parsing multiple large messages in 
parallel for the same connection, and to share the executor service between all 
connections, among other things. 

In addition to deriving size of the message from the stream, we also 
proactively read incoming message expiration time, before attempting to 
deserialize the entire message. If it’s expired at the time when we encounter 
it, we just skip the message in the byte stream altogether, without wasting any 
unnecessary work on it.

And if we actually fail to deserialize a message - say, because of table id or 
column being unknown still on the receiving side - we safely skip the bytes, 
without dropping the entire connection and losing all the buffered messages. We 
also immediately reply back to the coordinator node with the failure reason, 
rather than waiting for the coordinator callback to expire. This logic goes 
beyond single messages. If we encounter a corrupted frame, in most cases we 
will safely skip over it without dropping the connection and losing everything.

Similarly to the outbound path, inbound path now imposes strict limits on 
memory utilisation. Specifically, we bound the memory occupied by all parsed, 
but unprocessed messages -  on per-connection, per-endpoint, and global basis. 
Once a connection exceeds its local unprocessed capacity and cannot borrow any 
permits from per-endpoint and global reserve, it simply stops processing 
further messages, providing natural backpressure - until sufficient capacity is 
regained.

While working on this, we realised that there were literally no metrics 
covering inbound path - so we introduced some. Specifically, counters for 
received, expired, errored, and processed message count and byte size - exposed 
both via regular metrics and a new virtual table, {{internode_inbound}}.

Last, but not least, we fixed some more important bugs. To mention a couple:
1. Passing a buffer to the {{RebufferingByteBufDataInputPlus}} was potentially 
racing with its closing, potentially leaving the last appended buffer to be 
never released
2. {{RebufferingByteBufDataInputPlus#reBuffer()}} was mistakenly considering 
{{InputTimeoutException}} as fine to retry; in case of a genuine delay, it was 
*NOT* safe to retry from the current stream position, as {{reBuffer()}} would 
have irrevocably consumed part of the stream; this bug is still present in 
streaming code.

> Improvements to Internode Messaging
> -----------------------------------
>
>                 Key: CASSANDRA-15066
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-15066
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Messaging/Internode
>            Reporter: Benedict
>            Assignee: Benedict
>            Priority: Normal
>             Fix For: 4.0
>
>
> CASSANDRA-8457 introduced asynchronous networking to internode messaging, but 
> there have been several follow-up endeavours to improve some semantic issues. 
>  CASSANDRA-14503 and CASSANDRA-13630 are the latest such efforts, and were 
> combined some months ago into a single overarching refactor of the original 
> work, to address some of the issues that have been discovered.  Given the 
> criticality of this work to the project, we wanted to bring some more eyes to 
> bear to ensure the release goes ahead smoothly.  In doing so, we uncovered a 
> number of issues with messaging, some of which long standing, that we felt 
> needed to be addressed.  This patch widens the scope of CASSANDRA-14503 and 
> CASSANDRA-13630 in an effort to close the book on the messaging service, at 
> least for the foreseeable future.
> The patch includes a number of clarifying refactors that touch outside of the 
> {{net.async}} package, and a number of semantic changes to the {{net.async}} 
> packages itself.  We believe it clarifies the intent and behaviour of the 
> code while improving system stability, which we will outline in comments 
> below.
> https://github.com/belliottsmith/cassandra/tree/messaging-improvements



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to