[ https://issues.apache.org/jira/browse/CASSANDRA-11542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15273884#comment-15273884 ]
Stefania commented on CASSANDRA-11542: -------------------------------------- I've fixed the benchmark to ensure all jobs have a similar number of spark tasks. This was accomplished by reducing the number of HDFS blocks and decreasing the Cassandra split size in the connector. All jobs now have approximately 35 Spark tasks, of which 10 execute in parallel (5 nodes with 2 executors each). Here are the results for schema 1 and 3: ||15M rows||SCHEMA 1|| ||SCHEMA 3|| || ||Test||Time||Std. Dev||Time||Std. Dev|| |parquet_rdd|4.74|0.23|9.04|0.28| |parquet_df|2.02|0.82|4.86|0.50| |csv_rdd|11.36|1.99|10.14|0.64| |csv_df|13.17|0.45|15.93|1.61| |cassandra_rdd|*40.81*|0.80|*23.58*|0.53| |cassandra_rdd_stream|33.24|0.53|19.34|0.29| |cassandra_df|*26.07*|0.73|*16.75*|0.88| |cassandra_df_stream|19.39|2.18|13.19|1.71| As expected, the Cassandra numbers have improved significantly due to the increased parallelism. I've also performed some client side optimizations that followed by analyzing the JFR files recorded during the last run, the results are as follows: ||15M rows||SCHEMA 1|| ||SCHEMA 3|| || ||Test||Time||Std. Dev||Time||Std. Dev|| |parquet_rdd|4.58|0.23|8.85|0.63| |parquet_df|2.69|2.23|4.94|0.27| |csv_rdd|10.70|0.43|11.04|1.00| |csv_df|14.02|1.01|14.75|0.43| |cassandra_rdd|26.60|2.50|16.14|0.28| |cassandra_rdd_stream|*15.91*|0.33|*13.06*|0.72| |cassandra_df|21.20|0.86|15.15|1.27| |cassandra_df_stream|*13.04*|0.87|*11.18*|0.54| When both streaming and client optimizations are in place, the performance is considerably better. The overall percentage improvements by test and schema are as follows: ||Percentage reduction in time after optimization||SCHEMA 1|| ||SCHEMA 3|| || || ||RDD||DF||RDD||DF|| |Total|61.01%|49.98%|44.62%|33.25%| |Client optimizations|52.13%|32.75%|32.48%|15.27%| |Streaming (after client optimizations)|40.17%|38.51%|19.10%|26.22%| Raw result data is attached as [^spark-load-perf-results-003.zip]. The client improvements are available [here|https://github.com/stef1927/spark-cassandra-connector/commit/1490178b9c166dc9e6c38f63be5eb7232e73ddd8], this is a quick summary: * The cache of type decoders in the driver is extremely slow, saving the decoders in an array for each RDD computation is by far the most significant factor * {{GettableData}} is creating a map, [{{_indexOf}}|https://github.com/stef1927/spark-cassandra-connector/blob/9259/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/GettableData.scala#L16], for each single row. However, this map does not change across rows. In the optimizations I've replaced {{CassandraRow}} with {{GettableByIndexData}} for case classes but this needs further work. * For the non streaming case, the query may not necessarily end up to a replica (token aware routing only works with partition keys, not with tokens and only the first set of tasks end up to the preferred Spark location, following tasks may not). * The Spark metrics have been disabled as they contributed somewhat to the total decoding time, we need to calculate the binary row size without iterating the row column BBs again. * Scala for loops are implemented as {{range.foreach}} and the closure passed to the foreach is not inlined as far as I understand so I think we need to be careful with for loops in the critical path. They definitely show up quite high in JFR, but I cannot say that replacing a single for loop does cause an impact in the total final time. Further client size optimization is possible, I would suggest integrating the connector with the driver at a very low level in order to convert {{ByteBuffers}} directly into Scala types. In addition, the improvements I've already implemented will need further refinment. However, in the JFR recordings taken after these optimizations, the Spark executors now spend 66% of the time waiting and 33% of the time processing rows, whilst the NIO workers spend 95% of their time waiting. Therefore I suggest moving on to server side optimizations, starting with CASSANDRA-11521. This [benchmark|https://github.com/stef1927/spark-load-perf] is ready for review as it will be used to measure any future improvement. [~rspitzer] would you be able to quickly review it or find someone interested? > Create a benchmark to compare HDFS and Cassandra bulk read times > ---------------------------------------------------------------- > > Key: CASSANDRA-11542 > URL: https://issues.apache.org/jira/browse/CASSANDRA-11542 > Project: Cassandra > Issue Type: Sub-task > Components: Testing > Reporter: Stefania > Assignee: Stefania > Fix For: 3.x > > Attachments: jfr_recordings.zip, spark-load-perf-results-001.zip, > spark-load-perf-results-002.zip, spark-load-perf-results-003.zip > > > I propose creating a benchmark for comparing Cassandra and HDFS bulk reading > performance. Simple Spark queries will be performed on data stored in HDFS or > Cassandra, and the entire duration will be measured. An example query would > be the max or min of a column or a count\(*\). > This benchmark should allow determining the impact of: > * partition size > * number of clustering columns > * number of value columns (cells) -- This message was sent by Atlassian JIRA (v6.3.4#6332)