[
https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15288861#comment-15288861
]
Stefania commented on CASSANDRA-11521:
--------------------------------------
h3. Benchmark results
These are the original results of the benchmark defined in CASSANDRA-11542,
measurements are in seconds:
||15M rows||SCHEMA 1|| ||SCHEMA 3|| ||
||Test||Time||Std. Dev||Time||Std. Dev||
|parquet_rdd|4.74|0.23|9.04|0.28|
|parquet_df|2.02|0.82|4.86|0.50|
|csv_rdd|11.36|1.99|10.14|0.64|
|csv_df|13.17|0.45|15.93|1.61|
|cassandra_rdd|*40.81*|0.80|*23.58*|0.53|
|cassandra_rdd_stream|33.24|0.53|19.34|0.29|
|cassandra_df|*26.07*|0.73|*16.75*|0.88|
|cassandra_df_stream|19.39|2.18|13.19|1.71|
And these are the results with the initial streaming proof of concept and
client optimization patches applied:
||15M rows||SCHEMA 1|| ||SCHEMA 3|| ||
||Test||Time||Std. Dev||Time||Std. Dev||
|parquet_rdd|4.58|0.23|8.85|0.63|
|parquet_df|2.69|2.23|4.94|0.27|
|csv_rdd|10.70|0.43|11.04|1.00|
|csv_df|14.02|1.01|14.75|0.43|
|cassandra_rdd|*26.60*|2.50|*16.14*|0.28|
|cassandra_rdd_stream|*15.91*|0.33|*13.06*|0.72|
|cassandra_df|21.20|0.86|15.15|1.27|
|cassandra_df_stream|13.04|0.87|11.18|0.54|
These are the measurements with this ticket
[patch|https://github.com/apache/cassandra/compare/trunk...stef1927:11521]
applied:
||15M rows||SCHEMA 1|| ||SCHEMA 3|| ||
||Test||Time||Std. Dev||Time||Std. Dev||
|parquet_rdd|5.36|2.26|8.46|0.58|
|parquet_df|1.36|0.32|4.79|0.61|
|csv_rdd|9.61|1.01|10.10|0.59|
|csv_df|12.51|0.50|14.31|0.38|
|cassandra_rdd|*18.73*|0.68|*14.74*|0.92|
|cassandra_rdd_stream|*17.50*|0.72|*13.55*|0.96|
|cassandra_df|15.68|1.15|13.57|2.40|
|cassandra_df_stream|14.73|0.87|13.00|3.05|
Please refer to this
[comment|https://issues.apache.org/jira/browse/CASSANDRA-11542?focusedCommentId=15238919&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15238919]
and the following one for a description of the schema and RDD/DF test types.
The streaming results show a slight degradation in performance compared to the
proof of concept, because I've modified the streaming proof of concept to use
the exact [same
code|https://github.com/apache/cassandra/compare/trunk...stef1927:11521#diff-468c74b80d7a1d5b21948217659af747R1]
as the optimized paging approach. The only difference is that in one case we
send each page immediately whist in the other case we store each page in a
bounded queue and let the client take it when it is ready.
I've also added a new pager that keeps the iterators open, this is used in both
approaches. The bounded queue currently contains a maximum of 3 pages. If the
queue is full the reader's thread is blocked but in the final patch we would
have to interrupt and release resources if the client is too slow. Also, each
page is pre-encoded into a Netty Byte Buffer so there isn't any additional GC
overhead, but there is increased memory usage. Currently, this byte buffer is
copied when the message is sent, in a final patch we could try to avoid this.
h3. Unit tests results
Further, below are the results of some [unit
tests|https://github.com/apache/cassandra/compare/trunk...stef1927:11521#diff-04e8835163e2a326515d61f448a8ebbcR1]
that create a socket connection in process and retrieve full tables,
measurements are in milliseconds:
||Part. size||Tot. rows in table||Num. clustering columns||Page
size||Streaming||Optimized page-by-page||
|1KB|1000|1|100|3|3|
|10 KB|1000|1|100|9|13|
|64 KB|1000|1|100|57|81|
|10 KB|100000|100|5000|86|87|
h3. Observations
The worst degradation of performance of optimized paging vs. streaming
(approximately 40%) is seen in the unit tests above, with large partitions and
small page sizes. It should be noted that in the unit tests we retrieve the
full table and we don't do much row processing, whilst in the benchmark we
retrieve multiple token ranges in parallel, and there is significant row
processing done client side.
In the benchmark results, although there is a slight degradation in performance
for optimized paging, it is my opinion that this is too close to the standard
deviation to matter _at least right now_.
[~tjake] raises very valid points:
bq. I'm concerned this feature will cause a lot of heap pressure since it's
basically subverting paging. If we added a global bulk request memory space
perhaps OOM could be avoided that way (similar to our new page cache).
As mentioned above, there is no heap pressure in terms of GC activity because
the pages are stored in Netty direct byte buffers but the total memory used
increases. We most likely need a mechanism to limit the total amount of memory
used by these optimized queries, and to evict old pages that have not been
claimed.
bq. As for queuing pages, if you are always going to fetch up to N pages why
not just make the page size N times larger for bulk requests?
There is a limit of 256 MB on the message size but increasing page size
definitely improves the performance of the optimized paging approach. However,
we also want the client to start decoding as soon as possible to limit idle
time in the client, so large pages do not help in this regard.
bq. In order to detect the speed of the client you can use the
Channel.isWritable to see if the client isn't able to keep up with the write
watermark see https://issues.apache.org/jira/browse/CASSANDRA-11082
That's a good point and there is also a Netty handler for this purpose, the
[{{ChunkedWriteHandler}}|http://netty.io/4.0/api/io/netty/handler/stream/ChunkedWriteHandler.html].
It would be interesting to see if we can get it to work before writing our own.
h3. Moving forward
My inclination is that, although optimized paging could deliver almost the same
performance for the time being, it could become a limiting factor moving
forward. Also, having coded both approaches, I consider that optimized paging
doesn't reduce complexity server side. I would prefer a streaming approach with
a STREAM command that carries a query to allow starting and canceling
streaming, and to make it clear to the client that not all query features may
be available. Also, it allows further optimizations later on. Internally, we
can leverage the SelectStatement code when required, to avoid duplication.
So overall, I would suggest the following:
- Store the entire query result in an off-heap memory space (we could even
consider memory mapped files if memory usage is a concern). This allows
releasing sstables quickly and decouples the server from the client. If we are
not using memory mapped files we need a mechanism to limit the total memory
used and release old pages that have not been claimed by clients.
- Implement a Netty
[{{ChunkedInput}}|http://netty.io/4.0/api/io/netty/handler/stream/ChunkedInput.html]
that feeds from this memory space, ideally with zero copy. However, it may not
be possible if using memory mapped files rather than Netty byte buffers. Each
chunk would be a page. Then we add a {{ChunkedWriteHandler}} to the channel
pipe. This should take care of handling slow clients server side for us, but if
it doesn't work we'll have to write our own handler by looking at
{{Channel.isWritable}}.
- On the client we need a unique stream identifier that can be used to identify
the callback and in case we need to cancel an ongoing streaming operation. We
also need to process chunks in order, so a sequential number is required.
- A chunk is a page with a valid paging state so that if a client interrupts
streaming, it can still resume later on. For the time being a chunk contains
CQL rows, later we may consider a more optimized columnar format.
- We need at least a new request message, STREAM, for initiating and canceling
streaming sessions. We can respond with Result.Rows but we need to add a flag
to Result.Metadata that indicates that this is a streaming response, and
therefore the streaming properties are present, which are currently the unique
identifier and the sequential number.
> Implement streaming for bulk read requests
> ------------------------------------------
>
> Key: CASSANDRA-11521
> URL: https://issues.apache.org/jira/browse/CASSANDRA-11521
> Project: Cassandra
> Issue Type: Sub-task
> Components: Local Write-Read Paths
> Reporter: Stefania
> Assignee: Stefania
> Fix For: 3.x
>
>
> Allow clients to stream data from a C* host, bypassing the coordination layer
> and eliminating the need to query individual pages one by one.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)