[ 
https://issues.apache.org/jira/browse/CASSANDRA-11542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15273884#comment-15273884
 ] 

Stefania commented on CASSANDRA-11542:
--------------------------------------

I've fixed the benchmark to ensure all jobs have a similar number of spark 
tasks. This was accomplished by reducing the number of HDFS blocks and 
decreasing the Cassandra split size in the connector. All jobs now have 
approximately 35 Spark tasks, of which 10 execute in parallel (5 nodes with 2 
executors each).

Here are the results for schema 1 and 3:

||15M rows||SCHEMA 1|| ||SCHEMA 3|| ||
||Test||Time||Std. Dev||Time||Std. Dev||
|parquet_rdd|4.74|0.23|9.04|0.28|
|parquet_df|2.02|0.82|4.86|0.50|
|csv_rdd|11.36|1.99|10.14|0.64|
|csv_df|13.17|0.45|15.93|1.61|
|cassandra_rdd|*40.81*|0.80|*23.58*|0.53|
|cassandra_rdd_stream|33.24|0.53|19.34|0.29|
|cassandra_df|*26.07*|0.73|*16.75*|0.88|
|cassandra_df_stream|19.39|2.18|13.19|1.71|

As expected, the Cassandra numbers have improved significantly due to the 
increased parallelism. 

I've also performed some client side optimizations that followed by analyzing 
the JFR files recorded during the last run, the results are as follows:

||15M rows||SCHEMA 1|| ||SCHEMA 3|| ||
||Test||Time||Std. Dev||Time||Std. Dev||
|parquet_rdd|4.58|0.23|8.85|0.63|
|parquet_df|2.69|2.23|4.94|0.27|
|csv_rdd|10.70|0.43|11.04|1.00|
|csv_df|14.02|1.01|14.75|0.43|
|cassandra_rdd|26.60|2.50|16.14|0.28|
|cassandra_rdd_stream|*15.91*|0.33|*13.06*|0.72|
|cassandra_df|21.20|0.86|15.15|1.27|
|cassandra_df_stream|*13.04*|0.87|*11.18*|0.54|

When both streaming and client optimizations are in place, the performance is 
considerably better. The overall percentage improvements by test and schema are 
as follows:

||Percentage reduction in time after optimization||SCHEMA 1|| ||SCHEMA 3|| ||
|| ||RDD||DF||RDD||DF||
|Total|61.01%|49.98%|44.62%|33.25%|
|Client optimizations|52.13%|32.75%|32.48%|15.27%|
|Streaming (after client optimizations)|40.17%|38.51%|19.10%|26.22%|

Raw result data is attached as [^spark-load-perf-results-003.zip].

The client improvements are available 
[here|https://github.com/stef1927/spark-cassandra-connector/commit/1490178b9c166dc9e6c38f63be5eb7232e73ddd8],
 this is a quick summary:

* The cache of type decoders in the driver is extremely slow, saving the 
decoders in an array for each RDD computation is by far the most significant 
factor
* {{GettableData}} is creating a map, 
[{{_indexOf}}|https://github.com/stef1927/spark-cassandra-connector/blob/9259/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/GettableData.scala#L16],
 for each single row. However, this map does not change across rows. In the 
optimizations I've replaced {{CassandraRow}} with {{GettableByIndexData}} for 
case classes but this needs further work.
* For the non streaming case, the query may not necessarily end up to a replica 
(token aware routing only works with partition keys, not with tokens and only 
the first set of tasks end up to the preferred Spark location, following tasks 
may not).
* The Spark metrics have been disabled as they contributed somewhat to the 
total decoding time, we need to calculate the binary row size without iterating 
the row column BBs again.
* Scala for loops are implemented as {{range.foreach}} and the closure passed 
to the foreach is not inlined as far as I understand so I think we need to be 
careful with for loops in the critical path. They definitely show up quite high 
in JFR, but I cannot say that replacing a single for loop does cause an impact 
in the total final time.

Further client size optimization is possible, I would suggest integrating the 
connector with the driver at a very low level in order to convert 
{{ByteBuffers}} directly into Scala types. In addition, the improvements I've 
already implemented will need further refinment. However, in the JFR recordings 
taken after these optimizations, the Spark executors now spend 66% of the time 
waiting and 33% of the time processing rows, whilst the NIO workers spend 95% 
of their time waiting. Therefore I suggest moving on to server side 
optimizations, starting with CASSANDRA-11521.

This [benchmark|https://github.com/stef1927/spark-load-perf] is ready for 
review as it will be used to measure any future improvement. [~rspitzer] would 
you be able to quickly review it or find someone interested?

> Create a benchmark to compare HDFS and Cassandra bulk read times
> ----------------------------------------------------------------
>
>                 Key: CASSANDRA-11542
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-11542
>             Project: Cassandra
>          Issue Type: Sub-task
>          Components: Testing
>            Reporter: Stefania
>            Assignee: Stefania
>             Fix For: 3.x
>
>         Attachments: jfr_recordings.zip, spark-load-perf-results-001.zip, 
> spark-load-perf-results-002.zip, spark-load-perf-results-003.zip
>
>
> I propose creating a benchmark for comparing Cassandra and HDFS bulk reading 
> performance. Simple Spark queries will be performed on data stored in HDFS or 
> Cassandra, and the entire duration will be measured. An example query would 
> be the max or min of a column or a count\(*\).
> This benchmark should allow determining the impact of:
> * partition size
> * number of clustering columns
> * number of value columns (cells)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to