[ 
https://issues.apache.org/jira/browse/CASSANDRA-9259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15400394#comment-15400394
 ] 

Stefania commented on CASSANDRA-9259:
-------------------------------------

Now that CASSANDRA-11521 is ready for review, I've repeated the Spark benchmark 
defined by CASSANDRA-11542 using schema 1:

{code}
CREATE TABLE ks.schema1 (id TEXT, timestamp BIGINT, val1 INT, val2 INT, val3 
INT, val4 INT, val5 INT, val6 INT, val7 INT, val8 INT, val9 INT, val10 INT, 
PRIMARY KEY (id, timestamp))
{code}

and schema 3:

{code}
CREATE TABLE ks.schema3 (id TEXT, timestamp BIGINT, data TEXT, PRIMARY KEY (id, 
timestamp))
{code}

The benchmark measures how many seconds it takes to count rows and to find the 
maximum of two columns for each row, where rows are retrieved either via Spark 
RDDs or Data Frames (DFs). The most significant difference between RDD and DF 
tests is that in the DF tests only the two columns of interest to the Spark 
query are retrieved, whilst in the RDD tests the entire data set is retrieved. 
The data is either stored in Cassandra or in HDFS using CSV or Parquet files.

More details on the benchmark are available 
[here|https://issues.apache.org/jira/browse/CASSANDRA-11542?focusedCommentId=15249213&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15249213]
 and the code is available [here|https://github.com/stef1927/spark-load-perf].

Here is the comparison with the results of the benchmark that was run on 6th 
May with 15M rows, as described in [this 
comment|https://issues.apache.org/jira/browse/CASSANDRA-11542?focusedCommentId=15273884&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15273884].
 We can see that the final results are consistent with the proof of concept, 
which was presented at the Cassandra NGCC conference.

!before_after.jpg!

* C* old: is TRUNK with no optimizations (at 
c662d876b95d67a911dfe549d8a0d38ee6fbb904), and the Spark Connector without 
SPARK-C383
* C* POC: is the [proof-of-concept 
patch|https://github.com/stef1927/cassandra/commits/9259], and the Spark 
Connector with an [earlier 
version|https://github.com/stef1927/spark-cassandra-connector/commits/9259] of 
SPARK-C383 
* C* async: is the CASSANDRA-11521 patch, with results delivered to the client 
via the new asynchronous paging mechanism
* C* sync: is the CASSANDRA-11521 patch, with results delivered to the client 
via the existing synchronous paging mechanism

Here are the results run over several incremental data sets at 15M, 30M, 60M 
and 120M rows with 256 VNODES:

!256_vnodes.jpg!

Below are the results run over several incremental data sets at 1 15M, 30M, 60M 
and 120M rows without VNODES:

!no_vnodes.jpg!


The raw data is attached [^spark_benchmark_raw_data.zip].

h5. Conclusions

* Overall the duration of the 15M row test was improved by 65% (from about 40 
to 14 seconds) for schema 1 and by 56% (from 23 to 10 seconds) for schema 3.

* The new asynchronous paging mechanism significantly outperforms the existing 
mechanism with large data sets. For example, for schema 1 and 120M rows, it is 
approximately 30% faster. In order to achieve this, it is however necessary 
that the driver reserves a connection per asynchronous paging request, sharing 
connections degrades performance significantly and makes it no better than the 
existing mechanism.

* CSV still outperforms C* for schema 1 RDD tests. However, for DF tests and 
schema 3 RDD tests, C* is now on par or faster than CSV. This indicates that 
the number of cells in CQL rows continues to impact performance.

* Parquet is in a league of its own due to its efficient columnar format. It 
should however be noted that it may be storing the row count in metadata. A 
more meaningful benchmark could have been obtained had we excluded the row 
count from the time measurements.

> Bulk Reading from Cassandra
> ---------------------------
>
>                 Key: CASSANDRA-9259
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-9259
>             Project: Cassandra
>          Issue Type: New Feature
>          Components: Compaction, CQL, Local Write-Read Paths, Streaming and 
> Messaging, Testing
>            Reporter:  Brian Hess
>            Assignee: Stefania
>            Priority: Critical
>             Fix For: 3.x
>
>         Attachments: 256_vnodes.jpg, before_after.jpg, 
> bulk-read-benchmark.1.html, bulk-read-jfr-profiles.1.tar.gz, 
> bulk-read-jfr-profiles.2.tar.gz, no_vnodes.jpg, spark_benchmark_raw_data.zip
>
>
> This ticket is following on from the 2015 NGCC.  This ticket is designed to 
> be a place for discussing and designing an approach to bulk reading.
> The goal is to have a bulk reading path for Cassandra.  That is, a path 
> optimized to grab a large portion of the data for a table (potentially all of 
> it).  This is a core element in the Spark integration with Cassandra, and the 
> speed at which Cassandra can deliver bulk data to Spark is limiting the 
> performance of Spark-plus-Cassandra operations.  This is especially of 
> importance as Cassandra will (likely) leverage Spark for internal operations 
> (for example CASSANDRA-8234).
> The core CQL to consider is the following:
> SELECT a, b, c FROM myKs.myTable WHERE Token(partitionKey) > X AND 
> Token(partitionKey) <= Y
> Here, we choose X and Y to be contained within one token range (perhaps 
> considering the primary range of a node without vnodes, for example).  This 
> query pushes 50K-100K rows/sec, which is not very fast if we are doing bulk 
> operations via Spark (or other processing frameworks - ETL, etc).  There are 
> a few causes (e.g., inefficient paging).
> There are a few approaches that could be considered.  First, we consider a 
> new "Streaming Compaction" approach.  The key observation here is that a bulk 
> read from Cassandra is a lot like a major compaction, though instead of 
> outputting a new SSTable we would output CQL rows to a stream/socket/etc.  
> This would be similar to a CompactionTask, but would strip out some 
> unnecessary things in there (e.g., some of the indexing, etc). Predicates and 
> projections could also be encapsulated in this new "StreamingCompactionTask", 
> for example.
> Another approach would be an alternate storage format.  For example, we might 
> employ Parquet (just as an example) to store the same data as in the primary 
> Cassandra storage (aka SSTables).  This is akin to Global Indexes (an 
> alternate storage of the same data optimized for a particular query).  Then, 
> Cassandra can choose to leverage this alternate storage for particular CQL 
> queries (e.g., range scans).
> These are just 2 suggestions to get the conversation going.
> One thing to note is that it will be useful to have this storage segregated 
> by token range so that when you extract via these mechanisms you do not get 
> replications-factor numbers of copies of the data.  That will certainly be an 
> issue for some Spark operations (e.g., counting).  Thus, we will want 
> per-token-range storage (even for single disks), so this will likely leverage 
> CASSANDRA-6696 (though, we'll want to also consider the single disk case).
> It is also worth discussing what the success criteria is here.  It is 
> unlikely to be as fast as EDW or HDFS performance (though, that is still a 
> good goal), but being within some percentage of that performance should be 
> set as success.  For example, 2x as long as doing bulk operations on HDFS 
> with similar node count/size/etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to