[ 
https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15288861#comment-15288861
 ] 

Stefania commented on CASSANDRA-11521:
--------------------------------------

h3. Benchmark results

These are the original results of the benchmark defined in CASSANDRA-11542, 
measurements are in seconds:

||15M rows||SCHEMA 1|| ||SCHEMA 3|| ||
||Test||Time||Std. Dev||Time||Std. Dev||
|parquet_rdd|4.74|0.23|9.04|0.28|
|parquet_df|2.02|0.82|4.86|0.50|
|csv_rdd|11.36|1.99|10.14|0.64|
|csv_df|13.17|0.45|15.93|1.61|
|cassandra_rdd|*40.81*|0.80|*23.58*|0.53|
|cassandra_rdd_stream|33.24|0.53|19.34|0.29|
|cassandra_df|*26.07*|0.73|*16.75*|0.88|
|cassandra_df_stream|19.39|2.18|13.19|1.71|

And these are the results with the initial streaming proof of concept and 
client optimization patches applied:

||15M rows||SCHEMA 1|| ||SCHEMA 3|| ||
||Test||Time||Std. Dev||Time||Std. Dev||
|parquet_rdd|4.58|0.23|8.85|0.63|
|parquet_df|2.69|2.23|4.94|0.27|
|csv_rdd|10.70|0.43|11.04|1.00|
|csv_df|14.02|1.01|14.75|0.43|
|cassandra_rdd|*26.60*|2.50|*16.14*|0.28|
|cassandra_rdd_stream|*15.91*|0.33|*13.06*|0.72|
|cassandra_df|21.20|0.86|15.15|1.27|
|cassandra_df_stream|13.04|0.87|11.18|0.54|

These are the measurements with this ticket 
[patch|https://github.com/apache/cassandra/compare/trunk...stef1927:11521] 
applied:

||15M rows||SCHEMA 1|| ||SCHEMA 3|| ||
||Test||Time||Std. Dev||Time||Std. Dev||
|parquet_rdd|5.36|2.26|8.46|0.58|
|parquet_df|1.36|0.32|4.79|0.61|
|csv_rdd|9.61|1.01|10.10|0.59|
|csv_df|12.51|0.50|14.31|0.38|
|cassandra_rdd|*18.73*|0.68|*14.74*|0.92|
|cassandra_rdd_stream|*17.50*|0.72|*13.55*|0.96|
|cassandra_df|15.68|1.15|13.57|2.40|
|cassandra_df_stream|14.73|0.87|13.00|3.05|

Please refer to this 
[comment|https://issues.apache.org/jira/browse/CASSANDRA-11542?focusedCommentId=15238919&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15238919]
 and the following one for a description of the schema and RDD/DF test types.

The streaming results show a slight degradation in performance compared to the 
proof of concept, because I've modified the streaming proof of concept to use 
the exact [same 
code|https://github.com/apache/cassandra/compare/trunk...stef1927:11521#diff-468c74b80d7a1d5b21948217659af747R1]
 as the optimized paging approach. The only difference is that in one case we 
send each page immediately whist in the other case we store each page in a 
bounded queue and let the client take it when it is ready. 

I've also added a new pager that keeps the iterators open, this is used in both 
approaches. The bounded queue currently contains a maximum of 3 pages. If the 
queue is full the reader's thread is blocked but in the final patch we would 
have to interrupt and release resources if the client is too slow. Also, each 
page is pre-encoded into a Netty Byte Buffer so there isn't any additional GC 
overhead, but there is increased memory usage. Currently, this byte buffer is 
copied when the message is sent, in a final patch we could try to avoid this.

h3. Unit tests results

Further, below are the results of some [unit 
tests|https://github.com/apache/cassandra/compare/trunk...stef1927:11521#diff-04e8835163e2a326515d61f448a8ebbcR1]
 that create a socket connection in process and retrieve full tables, 
measurements are in milliseconds:

||Part. size||Tot. rows in table||Num. clustering columns||Page 
size||Streaming||Optimized page-by-page||
|1KB|1000|1|100|3|3|
|10 KB|1000|1|100|9|13|
|64 KB|1000|1|100|57|81|
|10 KB|100000|100|5000|86|87|

h3. Observations

The worst degradation of performance of optimized paging vs. streaming 
(approximately 40%) is seen in the unit tests above, with large partitions and 
small page sizes. It should be noted that in the unit tests we retrieve the 
full table and we don't do much row processing, whilst in the benchmark we 
retrieve multiple token ranges in parallel, and there is significant row 
processing done client side.

In the benchmark results, although there is a slight degradation in performance 
for optimized paging, it is my opinion that this is too close to the standard 
deviation to matter _at least right now_.

[~tjake] raises very valid points:

bq. I'm concerned this feature will cause a lot of heap pressure since it's 
basically subverting paging. If we added a global bulk request memory space 
perhaps OOM could be avoided that way (similar to our new page cache).

As mentioned above, there is no heap pressure in terms of GC activity because 
the pages are stored in Netty direct byte buffers but the total memory used 
increases. We most likely need a mechanism to limit the total amount of memory 
used by these optimized queries, and to evict old pages that have not been 
claimed.

bq. As for queuing pages, if you are always going to fetch up to N pages why 
not just make the page size N times larger for bulk requests?

There is a limit of 256 MB on the message size but increasing page size 
definitely improves the performance of the optimized paging approach. However, 
we also want the client to start decoding as soon as possible to limit idle 
time in the client, so large pages do not help in this regard.

bq. In order to detect the speed of the client you can use the 
Channel.isWritable to see if the client isn't able to keep up with the write 
watermark see https://issues.apache.org/jira/browse/CASSANDRA-11082

That's a good point and there is also a Netty handler for this purpose, the 
[{{ChunkedWriteHandler}}|http://netty.io/4.0/api/io/netty/handler/stream/ChunkedWriteHandler.html].
 It would be interesting to see if we can get it to work before writing our own.

h3. Moving forward

My inclination is that, although optimized paging could deliver almost the same 
performance for the time being, it could become a limiting factor moving 
forward. Also, having coded both approaches, I consider that optimized paging 
doesn't reduce complexity server side. I would prefer a streaming approach with 
a STREAM command that carries a query to allow starting and canceling 
streaming, and to make it clear to the client that not all query features may 
be available. Also, it allows further optimizations later on. Internally, we 
can leverage the SelectStatement code when required, to avoid duplication.

So overall, I would suggest the following:

- Store the entire query result in an off-heap memory space (we could even 
consider memory mapped files if memory usage is a concern). This allows 
releasing sstables quickly and decouples the server from the client. If we are 
not using memory mapped files we need a mechanism to limit the total memory 
used and release old pages that have not been claimed by clients.

- Implement a Netty 
[{{ChunkedInput}}|http://netty.io/4.0/api/io/netty/handler/stream/ChunkedInput.html]
 that feeds from this memory space, ideally with zero copy. However, it may not 
be possible if using memory mapped files rather than Netty byte buffers. Each 
chunk would be a page. Then we add a {{ChunkedWriteHandler}} to the channel 
pipe. This should take care of handling slow clients server side for us, but if 
it doesn't work we'll have to write our own handler by looking at 
{{Channel.isWritable}}.

- On the client we need a unique stream identifier that can be used to identify 
the callback and in case we need to cancel an ongoing streaming operation. We 
also need to process chunks in order, so a sequential number is required.

- A chunk is a page with a valid paging state so that if a client interrupts 
streaming, it can still resume later on. For the time being a chunk contains 
CQL rows, later we may consider a more optimized columnar format.

- We need at least a new request message, STREAM, for initiating and canceling 
streaming sessions. We can respond with Result.Rows but we need to add a flag 
to Result.Metadata that indicates that this is a streaming response, and 
therefore the streaming properties are present, which are currently the unique 
identifier and the sequential number. 

> Implement streaming for bulk read requests
> ------------------------------------------
>
>                 Key: CASSANDRA-11521
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-11521
>             Project: Cassandra
>          Issue Type: Sub-task
>          Components: Local Write-Read Paths
>            Reporter: Stefania
>            Assignee: Stefania
>             Fix For: 3.x
>
>
> Allow clients to stream data from a C* host, bypassing the coordination layer 
> and eliminating the need to query individual pages one by one.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to