[
https://issues.apache.org/jira/browse/CASSANDRA-12229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16019560#comment-16019560
]
Jason Brown commented on CASSANDRA-12229:
-----------------------------------------
Given [~aweisberg]'s comments on PR2, I've gone back and clarified/simplified
the use of nio channels and input/output streams. To that end I'm also reusing
the ByteBuffer-based Data*Plus classes, especially {{RebufferingInputStream}}.
Making these changes not only clarified this set of changes, but simplified the
existing streaming code's habit of wrapping a channels to input streams, and
vice versa. Further, by switching to use the {{DataInput}} model rather than
the {{InputStream}}, I'm able to read primitives more efficiently at the
{{StreamReader.StreamDeserializer}}. These changes also necessitated switching
StreamMessage.Serializer.deserialize to take a {{DataInputPlus}} rather than a
{{ReadableByteChannel}}, but that's small beans.
The net results of this optimization is that a first, naive perf run shows the
compressed sstable streaming is about 10% faster than trunk, and streamed
compressed sstables are about on-par with trunk. Clearly, this is better than
the last commit.
All these changes were pushed as a single commit to the same branch. That
updated the existing PR, which I'm not sure I really wanted to have happen, but
all the comments have been preserved.
The basic use of streams looks like this now:
outbound
- {{NettyStreamingMessageSender.FileStreamTask}} creates
{{ByteBufDataOutputStreamPlus}}, and passes that to
{{StreamMessage#serialize}}. {{ByteBufDataOutputStreamPlus}} extends
{{BufferedDataOutputStreamPlus}} and writes to the netty channel on flush. It
still has the {{#waitUntilWritable()}} checks for testing when it's safe to
write to the channel, and I still need to reevaluate that design based on
Ariel's concerns.
- {{StreamWriter}} will wrap the {{ByteBufDataOutputStreamPlus}} with
{{ByteBufCompressionDataOutputStreamPlus}}, who is responsible for (obviously)
compressing on the-the-fly any outbound file data.
- {{CompressedStreamWriter}} doesn't wrap, and just writes directly to
{{ByteBufDataOutputStreamPlus}}
inbound
- {{StreamingInboundHandler}} moves all incoming {{ByteBuf}} s into
{{RebufferingByteBufDataInputPlus}}. {{RebufferingByteBufDataInputPlus}}
extends {{RebufferingInputStream}}, and handles adjusting the netty channel's
auto read like I had in previous commits. It also implements
{{ReadableByteChannel}} to enable writing (copying) to a direct {{ByteBuffer}}
rather than always copying to on on-heap byte array.
- {{StreamReader}} will wrap the {{RebufferingByteBufDataInputPlus}} with
{{StreamCompressionInputStream}}, which decompresses incoming data.
{{StreamCompressionInputStream}} is wrapped with {{TrackedDataInputPlus}}
rather than {{TrackedInputStream}} to optimze reading primitive from the stream.
- {{CompressedStreamReader}} still uses {{CompressedInputStream}}, but that
class now extends {{RebufferingInputStream}}, as well, to a) efficiently read
primitives vs the existing {{InputStream}}-derived implemention, and b) use
{{ByteBuffer}} for direct buffers rather than using on-heap byte arrays.
There's a few minor wrinkles to iron out, but that shouldn't hold up review.
For example {{CompressedStreamWriter#write}} casts the {{DataInputPlus}} to
{{ByteBufDataOutputStreamPlus}} to make sure we can write a {{ByteBuf}} to it
(as {{ByteBuffer}} will cause a copy to the backing buffer to occur, which, I'd
like to avoid. There's also some comments from PR2 that I haven't addressed
yet, such as checksums and timing problem with waitUntilWritable(), but I
wanted to get the streams situation under better control.
> Move streaming to non-blocking IO and netty (streaming 2.1)
> -----------------------------------------------------------
>
> Key: CASSANDRA-12229
> URL: https://issues.apache.org/jira/browse/CASSANDRA-12229
> Project: Cassandra
> Issue Type: Improvement
> Components: Streaming and Messaging
> Reporter: Jason Brown
> Assignee: Jason Brown
> Fix For: 4.0
>
>
> As followup work to CASSANDRA-8457, we need to move streaming to use netty.
> Streaming 2.0 (CASSANDRA-5286) brought many good improvements to how files
> are transferred between nodes in a cluster. However, the low-level details of
> the current streaming implementation does not line up nicely with a
> non-blocking model, so I think this is a good time to review some of those
> details and add in additional goodness. The current implementation assumes a
> sequential or "single threaded" approach to the sending of stream messages as
> well as the transfer of files. In short, after several iterative prototypes,
> I propose the following:
> 1) use a single bi-diredtional connection (instead of requiring to two
> sockets & two threads)
> 2) send the "non-file" {{StreamMessage}} s (basically anything not
> {{OutboundFileMessage}}) via the normal internode messaging. This will
> require a slight bit more management of the session (the ability to look up a
> {{StreamSession}} from a static function on {{StreamManager}}, but we have
> have most of the pieces we need for this already.
> 3) switch to a non-blocking IO model (facilitated via netty)
> 4) Allow files to be streamed in parallel (CASSANDRA-4663) - this should just
> be a thing already
> 5) If the entire sstable is to streamed, in addition to the DATA component,
> transfer all the components of the sstable (primary index, bloom filter,
> stats, and so on). This way we can avoid the CPU and GC pressure from
> deserializing the stream into objects. File streaming then amounts to a
> block-level transfer.
> Note: The progress/results of CASSANDRA-11303 will need to be reflected here,
> as well.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]