[ 
https://issues.apache.org/jira/browse/CASSANDRA-12229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16019560#comment-16019560
 ] 

Jason Brown commented on CASSANDRA-12229:
-----------------------------------------

Given [~aweisberg]'s comments on PR2, I've gone back and clarified/simplified 
the use of nio channels and input/output streams. To that end I'm also reusing 
the ByteBuffer-based Data*Plus classes, especially {{RebufferingInputStream}}. 
Making these changes not only clarified this set of changes, but simplified the 
existing streaming code's habit of wrapping a channels to input streams, and 
vice versa. Further, by switching to use the {{DataInput}} model rather than 
the {{InputStream}}, I'm able to read primitives more efficiently at the 
{{StreamReader.StreamDeserializer}}. These changes also necessitated switching 
StreamMessage.Serializer.deserialize to take a {{DataInputPlus}} rather than a 
{{ReadableByteChannel}}, but that's small beans.

The net results of this optimization is that a first, naive perf run shows the 
compressed sstable streaming is about 10% faster than trunk, and streamed 
compressed sstables are about on-par with trunk. Clearly, this is better than 
the last commit.


All these changes were pushed as a single commit to the same branch. That 
updated the existing PR, which I'm not sure I really wanted to have happen, but 
all the comments have been preserved.

The basic use of streams looks like this now:
outbound

- {{NettyStreamingMessageSender.FileStreamTask}} creates 
{{ByteBufDataOutputStreamPlus}}, and passes that to 
{{StreamMessage#serialize}}. {{ByteBufDataOutputStreamPlus}} extends 
{{BufferedDataOutputStreamPlus}} and writes to the netty channel on flush. It 
still has the {{#waitUntilWritable()}} checks for testing when it's safe to 
write to the channel, and I still need to reevaluate that design based on 
Ariel's concerns.
- {{StreamWriter}} will wrap the {{ByteBufDataOutputStreamPlus}} with 
{{ByteBufCompressionDataOutputStreamPlus}}, who is responsible for (obviously) 
compressing on the-the-fly any outbound file data.
- {{CompressedStreamWriter}} doesn't wrap, and just writes directly to 
{{ByteBufDataOutputStreamPlus}} 

inbound
- {{StreamingInboundHandler}} moves all incoming {{ByteBuf}} s into 
{{RebufferingByteBufDataInputPlus}}. {{RebufferingByteBufDataInputPlus}} 
extends {{RebufferingInputStream}}, and handles adjusting the netty channel's 
auto read like I had in previous commits. It also implements 
{{ReadableByteChannel}} to enable writing (copying) to a direct {{ByteBuffer}} 
rather than always copying to on on-heap byte array.
- {{StreamReader}} will wrap the {{RebufferingByteBufDataInputPlus}} with 
{{StreamCompressionInputStream}}, which decompresses incoming data. 
{{StreamCompressionInputStream}} is wrapped with {{TrackedDataInputPlus}} 
rather than {{TrackedInputStream}} to optimze reading primitive from the stream.
- {{CompressedStreamReader}} still uses {{CompressedInputStream}}, but that 
class now extends {{RebufferingInputStream}}, as well, to a) efficiently read 
primitives vs the existing {{InputStream}}-derived implemention, and b) use 
{{ByteBuffer}} for direct buffers rather than using on-heap byte arrays.


There's a few minor wrinkles to iron out, but that shouldn't hold up review. 
For example {{CompressedStreamWriter#write}} casts the {{DataInputPlus}} to 
{{ByteBufDataOutputStreamPlus}} to make sure we can write a {{ByteBuf}} to it 
(as {{ByteBuffer}} will cause a copy to the backing buffer to occur, which, I'd 
like to avoid. There's also some comments from PR2 that I haven't addressed 
yet, such as checksums and timing problem with waitUntilWritable(), but I 
wanted to get the streams situation under better control.

> Move streaming to non-blocking IO and netty (streaming 2.1)
> -----------------------------------------------------------
>
>                 Key: CASSANDRA-12229
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-12229
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Streaming and Messaging
>            Reporter: Jason Brown
>            Assignee: Jason Brown
>             Fix For: 4.0
>
>
> As followup work to CASSANDRA-8457, we need to move streaming to use netty.
> Streaming 2.0 (CASSANDRA-5286) brought many good improvements to how files 
> are transferred between nodes in a cluster. However, the low-level details of 
> the current streaming implementation does not line up nicely with a 
> non-blocking model, so I think this is a good time to review some of those 
> details and add in additional goodness. The current implementation assumes a 
> sequential or "single threaded" approach to the sending of stream messages as 
> well as the transfer of files. In short, after several iterative prototypes, 
> I propose the following:
> 1) use a single bi-diredtional connection (instead of requiring to two 
> sockets & two threads)
> 2) send the "non-file" {{StreamMessage}} s (basically anything not 
> {{OutboundFileMessage}}) via the normal internode messaging. This will 
> require a slight bit more management of the session (the ability to look up a 
> {{StreamSession}} from a static function on {{StreamManager}}, but we have 
> have most of the pieces we need for this already.
> 3) switch to a non-blocking IO model (facilitated via netty)
> 4) Allow files to be streamed in parallel (CASSANDRA-4663) - this should just 
> be a thing already
> 5) If the entire sstable is to streamed, in addition to the DATA component, 
> transfer all the components of the sstable (primary index, bloom filter, 
> stats, and so on). This way we can avoid the CPU and GC pressure from 
> deserializing the stream into objects. File streaming then amounts to a 
> block-level transfer.
> Note: The progress/results of CASSANDRA-11303 will need to be reflected here, 
> as well.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to