[ 
https://issues.apache.org/jira/browse/CASSANDRA-11542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15249213#comment-15249213
 ] 

Stefania commented on CASSANDRA-11542:
--------------------------------------

I've run [the 
benchmark|https://github.com/stef1927/spark-load-perf/tree/master] described 
above on a 5-node GCE {{n1-standard-8}} cluster (30 GB RAM and 8 virtual cores 
per node, HDDs).

The following schemas were tested:

* {{CREATE TABLE ks.schema1 (id TEXT, timestamp BIGINT, val1 INT, val2 INT, 
val3 INT, val4 INT, val5 INT, val6 INT, val7 INT, val8 INT, val9 INT, val10 
INT, PRIMARY KEY (id, timestamp))}}

* {{CREATE TABLE ks.schema2 (id TEXT, timestamp BIGINT, val1 INT, val2 INT, 
val3 INT, val4 INT, val5 INT, val6 INT, val7 INT, val8 INT, val9 INT, val10 
INT, PRIMARY KEY ((id, timestamp)))}}

* {{CREATE TABLE ks.schema3 (id TEXT, timestamp BIGINT, data TEXT, PRIMARY KEY 
(id, timestamp))}}

* {{CREATE TABLE ks.schama4 (id TEXT, timestamp BIGINT, data TEXT, PRIMARY KEY 
((id, timestamp)))}}

The first two schemas are identical except that the second schema uses a 
composite partition key whist the first one uses a clustering key. The same is 
true for the third and forth schemas. The difference between the first two 
schemas and the last twos is that the 10 integer values are encoded into a 
string in the last two schemas. This was done to measure the impact of reading 
multiple values from Cassandra, whilst the impact of clustering rows can be 
determined by comparing schemas one and two or three and four.

15 million rows of random data were generated and stored in the following 
sources:

* Cassandra
* A CSV file stored in HDFS
* A Parquet file stored in HDFS

After generating the data, the Cassandra tables were flushed and compacted. The 
OS page cache was also flushed after generating the data, and after every test 
run, via {{sync && echo 3 | sudo tee /proc/sys/vm/drop_caches}}. The HDFS files 
were divided into 1000 partitions due to how the data was generated.

The benchmark either retrieves a Spark RDD (Resilient Distributed Datasets) or 
a DF (Data Frame). The difference between the two is that the RDD contains the 
entire table or file data, whilst the data frame only contains the two columns 
that are used to produce the final result. The following tests were performed 
in random order:

* *Cassandra RDD:* the entire Cassandra table is loaded into an RDD via 
{{sc.cassandraTable}};
* *CSV RDD:* the CSV data is loaded into an RDD via {{sc.textFile}};
* *Parquet RDD:* the Parquet data is loaded into an RDD via 
{{sqlContext.read.parquet}}
* *Cassandra DF:* a SELECT predicate is pushed to the server via 
{{CassandraSQLContext}} to retrieve two columns that are saved into a data 
frame;
* *CSV DF:* the CSV data is loaded into a DF via the spark SQL context using 
{{com.databricks.spark.csv}} as the format, and two columns are saved in a data 
frame; 
* *Parquet DF:* a SELECT predicate is used via {{SQLContext}} to retrieve two 
columns that are saved into a data frame.

RDD or DF are iterated and the result is calculated by selecting the global 
maximum of the maximum of two columns for each row. The time taken to create 
either RDD or DF and to iterate them is then measured.

h3. RDD Results

*Schema1*

||15M records||Parquet||CSV||Cassandra||Cassandra / CSV||Cassandra / Parquet||
|Run 1|3.494837|5.478472|43.423967|8|12|
|Run 2|2.845326|5.167405|47.170665|9|17|
|Run 3|2.613721|4.904634|48.451015|10|19|
|Average|2.98|5.18|46.35|9|16|
|Std. Dev.|0.46|0.29|2.61| | |


*Schema2*

||15M records||Parquet||CSV||Cassandra||Cassandra / CSV||Cassandra / Parquet||
|Run 1|3.486563|5.635907|46.00437|8|13|
|Run 2|2.68518|5.13979|46.108184|9|17|
|Run 3|2.673291|5.035654|46.076284|9|17|
|Average|2.95|5.27|46.06|9|16|
|Std. Dev.|0.47|0.32|0.05| | |

*Schema3*

||15M records||Parquet||CSV||Cassandra||Cassandra / CSV||Cassandra / Parquet||
|Run 1|6.122885|6.79348|29.643609|4|5|
|Run 2|5.826286|6.563861|32.900336|5|6|
|Run 3|5.751427|6.41375|33.176358|5|6|
|Average|5.90|6.59|31.91|5|5|
|Std. Dev.|0.20|0.19|1.96| | |

*Schema4*

||15M records||Parquet||CSV||Cassandra||Cassandra / CSV||Cassandra / Parquet||
|Run 1|6.137645|7.511649|29.518883|4|5|
|Run 2|5.984526|6.569239|30.723268|5|5|
|Run 3|5.763102|6.590789|30.789137|5|5|
|Average|5.96|6.89|30.34|4|5|
|Std. Dev.|0.19|0.54|0.72| | |


h3. DF Results

*Schema1*

||15M records||Parquet||CSV||Cassandra||Cassandra / CSV||Cassandra / Parquet||
|Run 1|2.843182|15.651141|37.997299|2|13|
|Run 2|2.357436|11.582413|30.836383|3|13|
|Run 3|2.386732|11.75583|30.061433|3|13|
|Average|2.53|13.00|32.97|3|13|
|Std. Dev.|0.27|2.30|4.38| | |


*Schema2*

||15M records||Parquet||CSV||Cassandra||Cassandra / CSV||Cassandra / Parquet||
|Run 1|3.016107|12.484605|95.12199|8|32|
|Run 2|2.455694|12.13422|37.583736|3|15|
|Run 3|2.329835|12.007215|34.966389|3|15|
|Average|2.60|12.21|55.89|5|21|
|Std. Dev.|0.37|0.25|34.00| | |


*Schema3*

||15M records||Parquet||CSV||Cassandra||Cassandra / CSV||Cassandra / Parquet||
|Run 1|5.544086|14.641745|22.944184|2|4|
|Run 2|4.306315|13.056165|22.287305|2|5|
|Run 3|4.265064|12.736621|23.91004|2|6|
|Average|4.71|13.48|23.05|2|5|
|Std. Dev.|0.73|1.02|0.82| | |

*Schema4*

||15M records||Parquet||CSV||Cassandra||Cassandra / CSV||Cassandra / Parquet||
|Run 1|5.057719|13.460405|26.966471|2|5|
|Run 2|4.440278|13.090147|25.700175|2|6|
|Run 3|3.956446|12.57829|24.537605|2|6|
|Average|4.48|13.04|25.73|2|6|
|Std. Dev.|0.55|0.44|1.21| | |

I've attached a zip file with the raw data, [^spark-load-perf-results-001.zip]. 
More details on the benchmark are available in the 
[README|https://github.com/stef1927/spark-load-perf/tree/master].

h3. Conclusions

In the RDD tests, Cassandra is 9 times slower than CSV for the first two 
schemas and 4-5 times slower for the last two schemas. When compared to 
Parquet, it is 16 times slower for the first two schemas and 5 times slower for 
the last two schemas.

In the DF tests, Cassandra is 3 times slower than CSV for the first two schemas 
and about 2 times slower for the third and forth schemas. When compared to 
Parquet, it is 13-15 times slower for the first two schemas and 5-6 times 
slower for the last two schemas. I've excluded an outlying value for the DF 
tests of schema 2, 95.12. Extremely large values for Cassandra were observed 3 
times, the other 2 values were excluded from the final data due to other 
problems with the benchmark.

Storing data into a text string, with the client decoding the values, is faster 
by approximately a factor 1.5.

Clustering columns vs composite partition keys seem to have only a small impact 
on performance.

h3. Next steps

I intend to modify the Spark Connector to support the streaming proof of 
concept delivered in the parent ticket CASSANDRA-9259. If the impact of 
streaming is as significant as hoped (factor 3 or 4 improvement), then I intend 
to implement the 
[approach|https://issues.apache.org/jira/browse/CASSANDRA-11521?focusedCommentId=15232701&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15232701]
 suggested in CASSANDRA-11521 and compare the two.

I would also like to spend some time understanding the difference in 
performance between the first two schemas and the last two, specifically how 
much of this is due to encoding CQL values vs. reading cells from disk. 
Further, if large outlying values continue to be observed, we need to 
understand the reason for them.

> Create a benchmark to compare HDFS and Cassandra bulk read times
> ----------------------------------------------------------------
>
>                 Key: CASSANDRA-11542
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-11542
>             Project: Cassandra
>          Issue Type: Sub-task
>          Components: Testing
>            Reporter: Stefania
>            Assignee: Stefania
>             Fix For: 3.x
>
>         Attachments: spark-load-perf-results-001.zip
>
>
> I propose creating a benchmark for comparing Cassandra and HDFS bulk reading 
> performance. Simple Spark queries will be performed on data stored in HDFS or 
> Cassandra, and the entire duration will be measured. An example query would 
> be the max or min of a column or a count\(*\).
> This benchmark should allow determining the impact of:
> * partition size
> * number of clustering columns
> * number of value columns (cells)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to