[
https://issues.apache.org/jira/browse/CASSANDRA-15066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16801278#comment-16801278
]
Aleksey Yeschenko commented on CASSANDRA-15066:
-----------------------------------------------
h2. Inbound Path
We also fixed, and made improvements to the inbound messaging path.
First, there was a serious issue in mixed mode operation. The logic in the
patch would rely on {{LRG}} flag set in the handshake to indicate that large
messages are expected on that particular connection. Based on that flag an
appropriate message handler would be picked - {{NonblockingBufferHandler}},
running on event loop, for small messages, or {{BlockingBufferHandler}},
running off event loop, for large messages. That flag, however, would never be
set by 3.0 and 3.11 connections, and large messages from 3.0/3.11 nodes would
end up blocking their event loops.
To fix that problem, we made sure that the single implementation of
{{InboundMessageHandler}} could handle messages of any size effectively. We do
this by deriving size of the incoming message from the byte stream, and based
on that size either handle the message in-place, on the event loop, or to
schedule a task ({{LargeCoprocessor}}) on an executor, off the event loop. In
principle this also allows us to decouple the large message threshold for QOS
purposes from the large message threshold for message processing purposes. For
example, for QOS purposes you might want to send all messages over 64k over a
separate connection, but to process all messages larger than 16k off event loop.
We also changed how off-eventloop processing is done. Previously, there would
be a long-running companion thread that would keep processing messages so long
as there were bytes to handle. After this patch, we’d schedule a single task on
the shared executor (shared between inbound and outbound code), responsible for
just one incoming message. We had to fix racy scheduling logic of the original
patch to make it happen, but in the end it allowed to safely handle large
message deser failures, and to allow parsing multiple large messages in
parallel for the same connection, and to share the executor service between all
connections, among other things.
In addition to deriving size of the message from the stream, we also
proactively read incoming message expiration time, before attempting to
deserialize the entire message. If it’s expired at the time when we encounter
it, we just skip the message in the byte stream altogether, without wasting any
unnecessary work on it.
And if we actually fail to deserialize a message - say, because of table id or
column being unknown still on the receiving side - we safely skip the bytes,
without dropping the entire connection and losing all the buffered messages. We
also immediately reply back to the coordinator node with the failure reason,
rather than waiting for the coordinator callback to expire. This logic goes
beyond single messages. If we encounter a corrupted frame, in most cases we
will safely skip over it without dropping the connection and losing everything.
Similarly to the outbound path, inbound path now imposes strict limits on
memory utilisation. Specifically, we bound the memory occupied by all parsed,
but unprocessed messages - on per-connection, per-endpoint, and global basis.
Once a connection exceeds its local unprocessed capacity and cannot borrow any
permits from per-endpoint and global reserve, it simply stops processing
further messages, providing natural backpressure - until sufficient capacity is
regained.
While working on this, we realised that there were literally no metrics
covering inbound path - so we introduced some. Specifically, counters for
received, expired, errored, and processed message count and byte size - exposed
both via regular metrics and a new virtual table, {{internode_inbound}}.
Last, but not least, we fixed some more important bugs. To mention a couple:
1. Passing a buffer to the {{RebufferingByteBufDataInputPlus}} was potentially
racing with its closing, potentially leaving the last appended buffer to be
never released
2. {{RebufferingByteBufDataInputPlus#reBuffer()}} was mistakenly considering
{{InputTimeoutException}} as fine to retry; in case of a genuine delay, it was
*NOT* safe to retry from the current stream position, as {{reBuffer()}} would
have irrevocably consumed part of the stream; this bug is still present in
streaming code.
> Improvements to Internode Messaging
> -----------------------------------
>
> Key: CASSANDRA-15066
> URL: https://issues.apache.org/jira/browse/CASSANDRA-15066
> Project: Cassandra
> Issue Type: Improvement
> Components: Messaging/Internode
> Reporter: Benedict
> Assignee: Benedict
> Priority: Normal
> Fix For: 4.0
>
>
> CASSANDRA-8457 introduced asynchronous networking to internode messaging, but
> there have been several follow-up endeavours to improve some semantic issues.
> CASSANDRA-14503 and CASSANDRA-13630 are the latest such efforts, and were
> combined some months ago into a single overarching refactor of the original
> work, to address some of the issues that have been discovered. Given the
> criticality of this work to the project, we wanted to bring some more eyes to
> bear to ensure the release goes ahead smoothly. In doing so, we uncovered a
> number of issues with messaging, some of which long standing, that we felt
> needed to be addressed. This patch widens the scope of CASSANDRA-14503 and
> CASSANDRA-13630 in an effort to close the book on the messaging service, at
> least for the foreseeable future.
> The patch includes a number of clarifying refactors that touch outside of the
> {{net.async}} package, and a number of semantic changes to the {{net.async}}
> packages itself. We believe it clarifies the intent and behaviour of the
> code while improving system stability, which we will outline in comments
> below.
> https://github.com/belliottsmith/cassandra/tree/messaging-improvements
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]