[jira] [Commented] (CASSANDRA-9259) Bulk Reading from Cassandra

2016-07-29 Thread Stefania (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-9259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15400394#comment-15400394
 ] 

Stefania commented on CASSANDRA-9259:
-

Now that CASSANDRA-11521 is ready for review, I've repeated the Spark benchmark 
defined by CASSANDRA-11542 using schema 1:

{code}
CREATE TABLE ks.schema1 (id TEXT, timestamp BIGINT, val1 INT, val2 INT, val3 
INT, val4 INT, val5 INT, val6 INT, val7 INT, val8 INT, val9 INT, val10 INT, 
PRIMARY KEY (id, timestamp))
{code}

and schema 3:

{code}
CREATE TABLE ks.schema3 (id TEXT, timestamp BIGINT, data TEXT, PRIMARY KEY (id, 
timestamp))
{code}

The benchmark measures how many seconds it takes to count rows and to find the 
maximum of two columns for each row, where rows are retrieved either via Spark 
RDDs or Data Frames (DFs). The most significant difference between RDD and DF 
tests is that in the DF tests only the two columns of interest to the Spark 
query are retrieved, whilst in the RDD tests the entire data set is retrieved. 
The data is either stored in Cassandra or in HDFS using CSV or Parquet files.

More details on the benchmark are available 
[here|https://issues.apache.org/jira/browse/CASSANDRA-11542?focusedCommentId=15249213=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15249213]
 and the code is available [here|https://github.com/stef1927/spark-load-perf].

Here is the comparison with the results of the benchmark that was run on 6th 
May with 15M rows, as described in [this 
comment|https://issues.apache.org/jira/browse/CASSANDRA-11542?focusedCommentId=15273884=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15273884].
 We can see that the final results are consistent with the proof of concept, 
which was presented at the Cassandra NGCC conference.

!before_after.jpg!

* C* old: is TRUNK with no optimizations (at 
c662d876b95d67a911dfe549d8a0d38ee6fbb904), and the Spark Connector without 
SPARK-C383
* C* POC: is the [proof-of-concept 
patch|https://github.com/stef1927/cassandra/commits/9259], and the Spark 
Connector with an [earlier 
version|https://github.com/stef1927/spark-cassandra-connector/commits/9259] of 
SPARK-C383 
* C* async: is the CASSANDRA-11521 patch, with results delivered to the client 
via the new asynchronous paging mechanism
* C* sync: is the CASSANDRA-11521 patch, with results delivered to the client 
via the existing synchronous paging mechanism

Here are the results run over several incremental data sets at 15M, 30M, 60M 
and 120M rows with 256 VNODES:

!256_vnodes.jpg!

Below are the results run over several incremental data sets at 1 15M, 30M, 60M 
and 120M rows without VNODES:

!no_vnodes.jpg!


The raw data is attached [^spark_benchmark_raw_data.zip].

h5. Conclusions

* Overall the duration of the 15M row test was improved by 65% (from about 40 
to 14 seconds) for schema 1 and by 56% (from 23 to 10 seconds) for schema 3.

* The new asynchronous paging mechanism significantly outperforms the existing 
mechanism with large data sets. For example, for schema 1 and 120M rows, it is 
approximately 30% faster. In order to achieve this, it is however necessary 
that the driver reserves a connection per asynchronous paging request, sharing 
connections degrades performance significantly and makes it no better than the 
existing mechanism.

* CSV still outperforms C* for schema 1 RDD tests. However, for DF tests and 
schema 3 RDD tests, C* is now on par or faster than CSV. This indicates that 
the number of cells in CQL rows continues to impact performance.

* Parquet is in a league of its own due to its efficient columnar format. It 
should however be noted that it may be storing the row count in metadata. A 
more meaningful benchmark could have been obtained had we excluded the row 
count from the time measurements.

> Bulk Reading from Cassandra
> ---
>
> Key: CASSANDRA-9259
> URL: https://issues.apache.org/jira/browse/CASSANDRA-9259
> Project: Cassandra
>  Issue Type: New Feature
>  Components: Compaction, CQL, Local Write-Read Paths, Streaming and 
> Messaging, Testing
>Reporter:  Brian Hess
>Assignee: Stefania
>Priority: Critical
> Fix For: 3.x
>
> Attachments: 256_vnodes.jpg, before_after.jpg, 
> bulk-read-benchmark.1.html, bulk-read-jfr-profiles.1.tar.gz, 
> bulk-read-jfr-profiles.2.tar.gz, no_vnodes.jpg, spark_benchmark_raw_data.zip
>
>
> This ticket is following on from the 2015 NGCC.  This ticket is designed to 
> be a place for discussing and designing an approach to bulk reading.
> The goal is to have a bulk reading path for Cassandra.  That is, a path 
> optimized to grab a large portion of the data for a table (potentially all of 
> it).  This is a core element in the Spark integration with Cassandra, and the 
> speed at which Cassandra 

[jira] [Commented] (CASSANDRA-9259) Bulk Reading from Cassandra

2016-04-13 Thread vincent.poncet (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-9259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15238992#comment-15238992
 ] 

vincent.poncet commented on CASSANDRA-9259:
---

In datawarehouse / analytics usecases, you are doing mostly full scans, 
(hopefully with some predicate pushdown either projection and filtering), but 
it is reading a big numbers of rows, then doing aggregations.
I just want to say that that takes time. So by definition, an analytic query on 
an OLTP database is always "wrong", in the sense of at the same time of doing 
the query, data changed, deleted, updated, inserted. So operational analytics 
is always approximate.

So, my point is in operational analytics, CL=1 will be perfectly fine.

> Bulk Reading from Cassandra
> ---
>
> Key: CASSANDRA-9259
> URL: https://issues.apache.org/jira/browse/CASSANDRA-9259
> Project: Cassandra
>  Issue Type: New Feature
>  Components: Compaction, CQL, Local Write-Read Paths, Streaming and 
> Messaging, Testing
>Reporter:  Brian Hess
>Assignee: Stefania
>Priority: Critical
> Fix For: 3.x
>
> Attachments: bulk-read-benchmark.1.html, 
> bulk-read-jfr-profiles.1.tar.gz, bulk-read-jfr-profiles.2.tar.gz
>
>
> This ticket is following on from the 2015 NGCC.  This ticket is designed to 
> be a place for discussing and designing an approach to bulk reading.
> The goal is to have a bulk reading path for Cassandra.  That is, a path 
> optimized to grab a large portion of the data for a table (potentially all of 
> it).  This is a core element in the Spark integration with Cassandra, and the 
> speed at which Cassandra can deliver bulk data to Spark is limiting the 
> performance of Spark-plus-Cassandra operations.  This is especially of 
> importance as Cassandra will (likely) leverage Spark for internal operations 
> (for example CASSANDRA-8234).
> The core CQL to consider is the following:
> SELECT a, b, c FROM myKs.myTable WHERE Token(partitionKey) > X AND 
> Token(partitionKey) <= Y
> Here, we choose X and Y to be contained within one token range (perhaps 
> considering the primary range of a node without vnodes, for example).  This 
> query pushes 50K-100K rows/sec, which is not very fast if we are doing bulk 
> operations via Spark (or other processing frameworks - ETL, etc).  There are 
> a few causes (e.g., inefficient paging).
> There are a few approaches that could be considered.  First, we consider a 
> new "Streaming Compaction" approach.  The key observation here is that a bulk 
> read from Cassandra is a lot like a major compaction, though instead of 
> outputting a new SSTable we would output CQL rows to a stream/socket/etc.  
> This would be similar to a CompactionTask, but would strip out some 
> unnecessary things in there (e.g., some of the indexing, etc). Predicates and 
> projections could also be encapsulated in this new "StreamingCompactionTask", 
> for example.
> Another approach would be an alternate storage format.  For example, we might 
> employ Parquet (just as an example) to store the same data as in the primary 
> Cassandra storage (aka SSTables).  This is akin to Global Indexes (an 
> alternate storage of the same data optimized for a particular query).  Then, 
> Cassandra can choose to leverage this alternate storage for particular CQL 
> queries (e.g., range scans).
> These are just 2 suggestions to get the conversation going.
> One thing to note is that it will be useful to have this storage segregated 
> by token range so that when you extract via these mechanisms you do not get 
> replications-factor numbers of copies of the data.  That will certainly be an 
> issue for some Spark operations (e.g., counting).  Thus, we will want 
> per-token-range storage (even for single disks), so this will likely leverage 
> CASSANDRA-6696 (though, we'll want to also consider the single disk case).
> It is also worth discussing what the success criteria is here.  It is 
> unlikely to be as fast as EDW or HDFS performance (though, that is still a 
> good goal), but being within some percentage of that performance should be 
> set as success.  For example, 2x as long as doing bulk operations on HDFS 
> with similar node count/size/etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-9259) Bulk Reading from Cassandra

2016-04-06 Thread Stefania (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-9259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15229592#comment-15229592
 ] 

Stefania commented on CASSANDRA-9259:
-

Thanks. I've created CASSANDRA-11520 and CASSANDRA-11521.

I should also have added the link to the POC patch: 
https://github.com/stef1927/cassandra/commits/9259.

> Bulk Reading from Cassandra
> ---
>
> Key: CASSANDRA-9259
> URL: https://issues.apache.org/jira/browse/CASSANDRA-9259
> Project: Cassandra
>  Issue Type: New Feature
>  Components: Compaction, CQL, Local Write-Read Paths, Streaming and 
> Messaging, Testing
>Reporter:  Brian Hess
>Assignee: Stefania
>Priority: Critical
> Fix For: 3.x
>
> Attachments: bulk-read-benchmark.1.html, 
> bulk-read-jfr-profiles.1.tar.gz, bulk-read-jfr-profiles.2.tar.gz
>
>
> This ticket is following on from the 2015 NGCC.  This ticket is designed to 
> be a place for discussing and designing an approach to bulk reading.
> The goal is to have a bulk reading path for Cassandra.  That is, a path 
> optimized to grab a large portion of the data for a table (potentially all of 
> it).  This is a core element in the Spark integration with Cassandra, and the 
> speed at which Cassandra can deliver bulk data to Spark is limiting the 
> performance of Spark-plus-Cassandra operations.  This is especially of 
> importance as Cassandra will (likely) leverage Spark for internal operations 
> (for example CASSANDRA-8234).
> The core CQL to consider is the following:
> SELECT a, b, c FROM myKs.myTable WHERE Token(partitionKey) > X AND 
> Token(partitionKey) <= Y
> Here, we choose X and Y to be contained within one token range (perhaps 
> considering the primary range of a node without vnodes, for example).  This 
> query pushes 50K-100K rows/sec, which is not very fast if we are doing bulk 
> operations via Spark (or other processing frameworks - ETL, etc).  There are 
> a few causes (e.g., inefficient paging).
> There are a few approaches that could be considered.  First, we consider a 
> new "Streaming Compaction" approach.  The key observation here is that a bulk 
> read from Cassandra is a lot like a major compaction, though instead of 
> outputting a new SSTable we would output CQL rows to a stream/socket/etc.  
> This would be similar to a CompactionTask, but would strip out some 
> unnecessary things in there (e.g., some of the indexing, etc). Predicates and 
> projections could also be encapsulated in this new "StreamingCompactionTask", 
> for example.
> Another approach would be an alternate storage format.  For example, we might 
> employ Parquet (just as an example) to store the same data as in the primary 
> Cassandra storage (aka SSTables).  This is akin to Global Indexes (an 
> alternate storage of the same data optimized for a particular query).  Then, 
> Cassandra can choose to leverage this alternate storage for particular CQL 
> queries (e.g., range scans).
> These are just 2 suggestions to get the conversation going.
> One thing to note is that it will be useful to have this storage segregated 
> by token range so that when you extract via these mechanisms you do not get 
> replications-factor numbers of copies of the data.  That will certainly be an 
> issue for some Spark operations (e.g., counting).  Thus, we will want 
> per-token-range storage (even for single disks), so this will likely leverage 
> CASSANDRA-6696 (though, we'll want to also consider the single disk case).
> It is also worth discussing what the success criteria is here.  It is 
> unlikely to be as fast as EDW or HDFS performance (though, that is still a 
> good goal), but being within some percentage of that performance should be 
> set as success.  For example, 2x as long as doing bulk operations on HDFS 
> with similar node count/size/etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-9259) Bulk Reading from Cassandra

2016-04-06 Thread Sylvain Lebresne (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-9259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15228185#comment-15228185
 ] 

Sylvain Lebresne commented on CASSANDRA-9259:
-

bq. In view of these results, I believe an optimized local read path is worth 
pursuing whilst streaming should also provide benefits

I agree, and I suggest creating 2 different sub-tickets for those and starting 
with that.

> Bulk Reading from Cassandra
> ---
>
> Key: CASSANDRA-9259
> URL: https://issues.apache.org/jira/browse/CASSANDRA-9259
> Project: Cassandra
>  Issue Type: New Feature
>  Components: Compaction, CQL, Local Write-Read Paths, Streaming and 
> Messaging, Testing
>Reporter:  Brian Hess
>Assignee: Stefania
>Priority: Critical
> Fix For: 3.x
>
> Attachments: bulk-read-benchmark.1.html, 
> bulk-read-jfr-profiles.1.tar.gz, bulk-read-jfr-profiles.2.tar.gz
>
>
> This ticket is following on from the 2015 NGCC.  This ticket is designed to 
> be a place for discussing and designing an approach to bulk reading.
> The goal is to have a bulk reading path for Cassandra.  That is, a path 
> optimized to grab a large portion of the data for a table (potentially all of 
> it).  This is a core element in the Spark integration with Cassandra, and the 
> speed at which Cassandra can deliver bulk data to Spark is limiting the 
> performance of Spark-plus-Cassandra operations.  This is especially of 
> importance as Cassandra will (likely) leverage Spark for internal operations 
> (for example CASSANDRA-8234).
> The core CQL to consider is the following:
> SELECT a, b, c FROM myKs.myTable WHERE Token(partitionKey) > X AND 
> Token(partitionKey) <= Y
> Here, we choose X and Y to be contained within one token range (perhaps 
> considering the primary range of a node without vnodes, for example).  This 
> query pushes 50K-100K rows/sec, which is not very fast if we are doing bulk 
> operations via Spark (or other processing frameworks - ETL, etc).  There are 
> a few causes (e.g., inefficient paging).
> There are a few approaches that could be considered.  First, we consider a 
> new "Streaming Compaction" approach.  The key observation here is that a bulk 
> read from Cassandra is a lot like a major compaction, though instead of 
> outputting a new SSTable we would output CQL rows to a stream/socket/etc.  
> This would be similar to a CompactionTask, but would strip out some 
> unnecessary things in there (e.g., some of the indexing, etc). Predicates and 
> projections could also be encapsulated in this new "StreamingCompactionTask", 
> for example.
> Another approach would be an alternate storage format.  For example, we might 
> employ Parquet (just as an example) to store the same data as in the primary 
> Cassandra storage (aka SSTables).  This is akin to Global Indexes (an 
> alternate storage of the same data optimized for a particular query).  Then, 
> Cassandra can choose to leverage this alternate storage for particular CQL 
> queries (e.g., range scans).
> These are just 2 suggestions to get the conversation going.
> One thing to note is that it will be useful to have this storage segregated 
> by token range so that when you extract via these mechanisms you do not get 
> replications-factor numbers of copies of the data.  That will certainly be an 
> issue for some Spark operations (e.g., counting).  Thus, we will want 
> per-token-range storage (even for single disks), so this will likely leverage 
> CASSANDRA-6696 (though, we'll want to also consider the single disk case).
> It is also worth discussing what the success criteria is here.  It is 
> unlikely to be as fast as EDW or HDFS performance (though, that is still a 
> good goal), but being within some percentage of that performance should be 
> set as success.  For example, 2x as long as doing bulk operations on HDFS 
> with similar node count/size/etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-9259) Bulk Reading from Cassandra

2016-04-06 Thread Stefania (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-9259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15228054#comment-15228054
 ] 

Stefania commented on CASSANDRA-9259:
-

Here are the results of a proof of concept of an *optimized read path* for 
local reads at CL 1 and *streaming*.

These are the results in *ops/second*:

||Partition Size||Page size||Num. Partitions in the table||Synchronous, no 
optimization ops/s||Synchronous, with optimization ops/s||Prefetch a page, no 
optimization ops/s||Prefetch a page, with optimization ops/s||Streaming, 
measurement 1 ops/s||Streaming, measurement 2 ops/s||Improvement due to local 
read optimization||Improvement due to streaming||Total improvement||Link to the 
test||
|100 
KBYTES|10|250K|98|148|189|283|784|767|49.74%|174.03%|310.32%|[link|http://cstar.datastax.com/tests/id/6dc252e8-fb0c-11e5-9ad6-0256e416528f]|
|10 
KBYTES|100|1M|93|138|174|259|659|673|48.85%|157.14%|282.76%|[link|http://cstar.datastax.com/tests/id/d8002fe2-fad3-11e5-a500-0256e416528f]|
|1 
KBYTE|1000|1M|84|133|114|179|233|239|57.02%|31.84%|107.02%|[link|http://cstar.datastax.com/tests/id/8dd1c5ba-fad9-11e5-82e5-0256e416528f]|
|1 
KBYTE|1000|2M|60|98|94|153|247|248|62.77%|61.76%|163.30%|[link|http://cstar.datastax.com/tests/id/97be0520-fba6-11e5-bba8-0256e416528f]|
|100 
BYTES|1|5M|21|33|24|37|41|44|54.17%|14.86%|77.08%|[link|http://cstar.datastax.com/tests/id/95e31c0c-fb12-11e5-9ad6-0256e416528f]|
|50 
BYTES|2|5M|20|32|20|33|35|37|65.00%|9.09%|80.00%|[link|http://cstar.datastax.com/tests/id/bf1a52a0-fb8c-11e5-838f-0256e416528f]|
|10 
KBYTES|5000|500K|31|46|31|46|45|46|48.39%|-1.09%|46.77%|[link|http://cstar.datastax.com/tests/id/df9e6e96-fbbc-11e5-bf65-0256e416528f]|
|1 
KBYTE|5000|2M|35|56|40|64|66|66|60.00%|3.13%|65.00%|[link|http://cstar.datastax.com/tests/id/1d1785fc-fbb6-11e5-bf65-0256e416528f]|
|100 
BYTES|5000|5M|22|40|31|53|66|66|70.97%|24.53%|112.90%|[link|http://cstar.datastax.com/tests/id/70262d52-fbac-11e5-a876-0256e416528f]|


These are the same results but expressed in *rows/second*:

||Partition Size||Page size||Num. Partitions in the table||Synchronous, no 
optimization rows/s||Synchronous, with optimization rows/s||Prefetch a page, no 
optimization rows/s||Prefetch a page, with optimization rows/s||Streaming, 
measurement 1 rows/s||Streaming, measurement 2 rows/s||Improvement due to local 
read optimization||Improvement due to streaming||Total improvement||Link to the 
test||
|100 
KBYTES|10|250K|963|1453|1849|2761|7702|7522|49.32%|175.70%|311.68%|[link|http://cstar.datastax.com/tests/id/6dc252e8-fb0c-11e5-9ad6-0256e416528f]|
|10 
KBYTES|100|1M|8830|13159|16572|24591|62649|63975|48.39%|157.46%|282.04%|[link|http://cstar.datastax.com/tests/id/d8002fe2-fad3-11e5-a500-0256e416528f]|
|1 
KBYTE|1000|1M|52543|83548|71277|112558|145637|150070|57.92%|31.36%|107.44%|[link|http://cstar.datastax.com/tests/id/8dd1c5ba-fad9-11e5-82e5-0256e416528f]|
|1 
KBYTE|1000|2M|47029|76292|74131|119727|193738|193495|61.51%|61.71%|161.18%|[link|http://cstar.datastax.com/tests/id/97be0520-fba6-11e5-bba8-0256e416528f]|
|100 
BYTES|1|5M|88130|142800|100590|158780|176699|185292|57.85%|13.99%|79.93%|[link|http://cstar.datastax.com/tests/id/95e31c0c-fb12-11e5-9ad6-0256e416528f]|
|50 
BYTES|2|5M|94581|153296|97599|157463|169226|175581|61.34%|9.49%|76.64%|[link|http://cstar.datastax.com/tests/id/bf1a52a0-fb8c-11e5-838f-0256e416528f]|
|10 
KBYTES|5000|500K|14938|22356|15152|22623|22174|22419|49.31%|-1.44%|47.15%|[link|http://cstar.datastax.com/tests/id/df9e6e96-fbbc-11e5-bf65-0256e416528f]|
|1 
KBYTE|5000|2M|63552|100974|71644|115001|119537|119079|60.52%|3.75%|66.53%|[link|http://cstar.datastax.com/tests/id/1d1785fc-fbb6-11e5-bf65-0256e416528f]|
|100 
BYTES|5000|5M|70154|126547|98331|168121|208226|207482|70.97%|23.63%|111.38%|[link|http://cstar.datastax.com/tests/id/70262d52-fbac-11e5-a876-0256e416528f]|


The columns above refer to the following cassandra-stress operations:

*Synchronous page retrieval*
The client retrieves each page synchronously, with and without the optimized 
local read path.

*Asynchronous page retrieval (prefetch)*
The client retrieves the first page synchronously and then prefetches the next 
page, before processing the results of the previous page, with and without the 
optimized local read path.

*Streaming*
The client requests all pages initially and then waits synchronously for the 
first page. For the following pages, each operation processes a page that was 
previously delivered, blocking only if a page is unavailable. 

There are two equivalent measurements for streaming because the local read path 
optimization is always available; it would have added considerable extra work 
to implement streaming without optimized read path, and it would have only 
provided comparison data which is already available.

*Results*
The improvement due to local read optimization is calculated by comparing the 

[jira] [Commented] (CASSANDRA-9259) Bulk Reading from Cassandra

2016-03-28 Thread Stefania (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-9259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15215303#comment-15215303
 ] 

Stefania commented on CASSANDRA-9259:
-

Thank you for sharing the code, it will save us a lot of time if we decide to 
try out different transfer mechanisms. For now, I am working on streaming and 
the other optimizations as described above; later we may well focus on 
different transfer mechanisms.

> Bulk Reading from Cassandra
> ---
>
> Key: CASSANDRA-9259
> URL: https://issues.apache.org/jira/browse/CASSANDRA-9259
> Project: Cassandra
>  Issue Type: New Feature
>  Components: Compaction, CQL, Local Write-Read Paths, Streaming and 
> Messaging, Testing
>Reporter:  Brian Hess
>Assignee: Stefania
>Priority: Critical
> Fix For: 3.x
>
> Attachments: bulk-read-benchmark.1.html, 
> bulk-read-jfr-profiles.1.tar.gz, bulk-read-jfr-profiles.2.tar.gz
>
>
> This ticket is following on from the 2015 NGCC.  This ticket is designed to 
> be a place for discussing and designing an approach to bulk reading.
> The goal is to have a bulk reading path for Cassandra.  That is, a path 
> optimized to grab a large portion of the data for a table (potentially all of 
> it).  This is a core element in the Spark integration with Cassandra, and the 
> speed at which Cassandra can deliver bulk data to Spark is limiting the 
> performance of Spark-plus-Cassandra operations.  This is especially of 
> importance as Cassandra will (likely) leverage Spark for internal operations 
> (for example CASSANDRA-8234).
> The core CQL to consider is the following:
> SELECT a, b, c FROM myKs.myTable WHERE Token(partitionKey) > X AND 
> Token(partitionKey) <= Y
> Here, we choose X and Y to be contained within one token range (perhaps 
> considering the primary range of a node without vnodes, for example).  This 
> query pushes 50K-100K rows/sec, which is not very fast if we are doing bulk 
> operations via Spark (or other processing frameworks - ETL, etc).  There are 
> a few causes (e.g., inefficient paging).
> There are a few approaches that could be considered.  First, we consider a 
> new "Streaming Compaction" approach.  The key observation here is that a bulk 
> read from Cassandra is a lot like a major compaction, though instead of 
> outputting a new SSTable we would output CQL rows to a stream/socket/etc.  
> This would be similar to a CompactionTask, but would strip out some 
> unnecessary things in there (e.g., some of the indexing, etc). Predicates and 
> projections could also be encapsulated in this new "StreamingCompactionTask", 
> for example.
> Another approach would be an alternate storage format.  For example, we might 
> employ Parquet (just as an example) to store the same data as in the primary 
> Cassandra storage (aka SSTables).  This is akin to Global Indexes (an 
> alternate storage of the same data optimized for a particular query).  Then, 
> Cassandra can choose to leverage this alternate storage for particular CQL 
> queries (e.g., range scans).
> These are just 2 suggestions to get the conversation going.
> One thing to note is that it will be useful to have this storage segregated 
> by token range so that when you extract via these mechanisms you do not get 
> replications-factor numbers of copies of the data.  That will certainly be an 
> issue for some Spark operations (e.g., counting).  Thus, we will want 
> per-token-range storage (even for single disks), so this will likely leverage 
> CASSANDRA-6696 (though, we'll want to also consider the single disk case).
> It is also worth discussing what the success criteria is here.  It is 
> unlikely to be as fast as EDW or HDFS performance (though, that is still a 
> good goal), but being within some percentage of that performance should be 
> set as success.  For example, 2x as long as doing bulk operations on HDFS 
> with similar node count/size/etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-9259) Bulk Reading from Cassandra

2016-03-28 Thread Stefania (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-9259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15215297#comment-15215297
 ] 

Stefania commented on CASSANDRA-9259:
-

Thank you for this observation. Whilst this is a valid point, the focus of this 
patch is local transfers at CL=1. For CL > 1, the latency introduced by 
coordinating across C* nodes is probably the dominating factor and this will 
not be addressed in this patch. Here we are focusing on local transfers at CL=1.

> Bulk Reading from Cassandra
> ---
>
> Key: CASSANDRA-9259
> URL: https://issues.apache.org/jira/browse/CASSANDRA-9259
> Project: Cassandra
>  Issue Type: New Feature
>  Components: Compaction, CQL, Local Write-Read Paths, Streaming and 
> Messaging, Testing
>Reporter:  Brian Hess
>Assignee: Stefania
>Priority: Critical
> Fix For: 3.x
>
> Attachments: bulk-read-benchmark.1.html, 
> bulk-read-jfr-profiles.1.tar.gz, bulk-read-jfr-profiles.2.tar.gz
>
>
> This ticket is following on from the 2015 NGCC.  This ticket is designed to 
> be a place for discussing and designing an approach to bulk reading.
> The goal is to have a bulk reading path for Cassandra.  That is, a path 
> optimized to grab a large portion of the data for a table (potentially all of 
> it).  This is a core element in the Spark integration with Cassandra, and the 
> speed at which Cassandra can deliver bulk data to Spark is limiting the 
> performance of Spark-plus-Cassandra operations.  This is especially of 
> importance as Cassandra will (likely) leverage Spark for internal operations 
> (for example CASSANDRA-8234).
> The core CQL to consider is the following:
> SELECT a, b, c FROM myKs.myTable WHERE Token(partitionKey) > X AND 
> Token(partitionKey) <= Y
> Here, we choose X and Y to be contained within one token range (perhaps 
> considering the primary range of a node without vnodes, for example).  This 
> query pushes 50K-100K rows/sec, which is not very fast if we are doing bulk 
> operations via Spark (or other processing frameworks - ETL, etc).  There are 
> a few causes (e.g., inefficient paging).
> There are a few approaches that could be considered.  First, we consider a 
> new "Streaming Compaction" approach.  The key observation here is that a bulk 
> read from Cassandra is a lot like a major compaction, though instead of 
> outputting a new SSTable we would output CQL rows to a stream/socket/etc.  
> This would be similar to a CompactionTask, but would strip out some 
> unnecessary things in there (e.g., some of the indexing, etc). Predicates and 
> projections could also be encapsulated in this new "StreamingCompactionTask", 
> for example.
> Another approach would be an alternate storage format.  For example, we might 
> employ Parquet (just as an example) to store the same data as in the primary 
> Cassandra storage (aka SSTables).  This is akin to Global Indexes (an 
> alternate storage of the same data optimized for a particular query).  Then, 
> Cassandra can choose to leverage this alternate storage for particular CQL 
> queries (e.g., range scans).
> These are just 2 suggestions to get the conversation going.
> One thing to note is that it will be useful to have this storage segregated 
> by token range so that when you extract via these mechanisms you do not get 
> replications-factor numbers of copies of the data.  That will certainly be an 
> issue for some Spark operations (e.g., counting).  Thus, we will want 
> per-token-range storage (even for single disks), so this will likely leverage 
> CASSANDRA-6696 (though, we'll want to also consider the single disk case).
> It is also worth discussing what the success criteria is here.  It is 
> unlikely to be as fast as EDW or HDFS performance (though, that is still a 
> good goal), but being within some percentage of that performance should be 
> set as success.  For example, 2x as long as doing bulk operations on HDFS 
> with similar node count/size/etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-9259) Bulk Reading from Cassandra

2016-03-28 Thread Gary Dusbabek (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-9259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15214212#comment-15214212
 ] 

Gary Dusbabek commented on CASSANDRA-9259:
--

FWIW I experimented with domain sockets late last year. Domain sockets were 
faster, but not much faster than reading for eth0->eth0, which on modern linux 
distros goes over the loopback (try it).

Experimental branches of the Datastax java driver and Cassandra: 
https://github.com/gdusbabek/cassandra/tree/cassandra-3.0.2-domain-socket and 
https://github.com/gdusbabek/java-driver/tree/domain_sockets_3.0



> Bulk Reading from Cassandra
> ---
>
> Key: CASSANDRA-9259
> URL: https://issues.apache.org/jira/browse/CASSANDRA-9259
> Project: Cassandra
>  Issue Type: New Feature
>  Components: Compaction, CQL, Local Write-Read Paths, Streaming and 
> Messaging, Testing
>Reporter:  Brian Hess
>Assignee: Stefania
>Priority: Critical
> Fix For: 3.x
>
> Attachments: bulk-read-benchmark.1.html, 
> bulk-read-jfr-profiles.1.tar.gz, bulk-read-jfr-profiles.2.tar.gz
>
>
> This ticket is following on from the 2015 NGCC.  This ticket is designed to 
> be a place for discussing and designing an approach to bulk reading.
> The goal is to have a bulk reading path for Cassandra.  That is, a path 
> optimized to grab a large portion of the data for a table (potentially all of 
> it).  This is a core element in the Spark integration with Cassandra, and the 
> speed at which Cassandra can deliver bulk data to Spark is limiting the 
> performance of Spark-plus-Cassandra operations.  This is especially of 
> importance as Cassandra will (likely) leverage Spark for internal operations 
> (for example CASSANDRA-8234).
> The core CQL to consider is the following:
> SELECT a, b, c FROM myKs.myTable WHERE Token(partitionKey) > X AND 
> Token(partitionKey) <= Y
> Here, we choose X and Y to be contained within one token range (perhaps 
> considering the primary range of a node without vnodes, for example).  This 
> query pushes 50K-100K rows/sec, which is not very fast if we are doing bulk 
> operations via Spark (or other processing frameworks - ETL, etc).  There are 
> a few causes (e.g., inefficient paging).
> There are a few approaches that could be considered.  First, we consider a 
> new "Streaming Compaction" approach.  The key observation here is that a bulk 
> read from Cassandra is a lot like a major compaction, though instead of 
> outputting a new SSTable we would output CQL rows to a stream/socket/etc.  
> This would be similar to a CompactionTask, but would strip out some 
> unnecessary things in there (e.g., some of the indexing, etc). Predicates and 
> projections could also be encapsulated in this new "StreamingCompactionTask", 
> for example.
> Another approach would be an alternate storage format.  For example, we might 
> employ Parquet (just as an example) to store the same data as in the primary 
> Cassandra storage (aka SSTables).  This is akin to Global Indexes (an 
> alternate storage of the same data optimized for a particular query).  Then, 
> Cassandra can choose to leverage this alternate storage for particular CQL 
> queries (e.g., range scans).
> These are just 2 suggestions to get the conversation going.
> One thing to note is that it will be useful to have this storage segregated 
> by token range so that when you extract via these mechanisms you do not get 
> replications-factor numbers of copies of the data.  That will certainly be an 
> issue for some Spark operations (e.g., counting).  Thus, we will want 
> per-token-range storage (even for single disks), so this will likely leverage 
> CASSANDRA-6696 (though, we'll want to also consider the single disk case).
> It is also worth discussing what the success criteria is here.  It is 
> unlikely to be as fast as EDW or HDFS performance (though, that is still a 
> good goal), but being within some percentage of that performance should be 
> set as success.  For example, 2x as long as doing bulk operations on HDFS 
> with similar node count/size/etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-9259) Bulk Reading from Cassandra

2016-03-24 Thread Evan Chan (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-9259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15209915#comment-15209915
 ] 

Evan Chan commented on CASSANDRA-9259:
--

The biggest difference between HDFS and local client reads, say by scanning 
SSTables directly or going through mmap / domain sockets / other faster 
local-only mechanism, is consistency.   We're not guaranteed that any one of N 
replicas has the exact same SSTables for any moment in time, are we?   Thus, 
different Spark workers would end up with potentially different data, for the 
same token range -- essentially CL-1 behavior, which seems not really 
acceptable to folks used to precision analytics.

> Bulk Reading from Cassandra
> ---
>
> Key: CASSANDRA-9259
> URL: https://issues.apache.org/jira/browse/CASSANDRA-9259
> Project: Cassandra
>  Issue Type: New Feature
>  Components: Compaction, CQL, Local Write-Read Paths, Streaming and 
> Messaging, Testing
>Reporter:  Brian Hess
>Assignee: Stefania
>Priority: Critical
> Fix For: 3.x
>
> Attachments: bulk-read-benchmark.1.html, 
> bulk-read-jfr-profiles.1.tar.gz, bulk-read-jfr-profiles.2.tar.gz
>
>
> This ticket is following on from the 2015 NGCC.  This ticket is designed to 
> be a place for discussing and designing an approach to bulk reading.
> The goal is to have a bulk reading path for Cassandra.  That is, a path 
> optimized to grab a large portion of the data for a table (potentially all of 
> it).  This is a core element in the Spark integration with Cassandra, and the 
> speed at which Cassandra can deliver bulk data to Spark is limiting the 
> performance of Spark-plus-Cassandra operations.  This is especially of 
> importance as Cassandra will (likely) leverage Spark for internal operations 
> (for example CASSANDRA-8234).
> The core CQL to consider is the following:
> SELECT a, b, c FROM myKs.myTable WHERE Token(partitionKey) > X AND 
> Token(partitionKey) <= Y
> Here, we choose X and Y to be contained within one token range (perhaps 
> considering the primary range of a node without vnodes, for example).  This 
> query pushes 50K-100K rows/sec, which is not very fast if we are doing bulk 
> operations via Spark (or other processing frameworks - ETL, etc).  There are 
> a few causes (e.g., inefficient paging).
> There are a few approaches that could be considered.  First, we consider a 
> new "Streaming Compaction" approach.  The key observation here is that a bulk 
> read from Cassandra is a lot like a major compaction, though instead of 
> outputting a new SSTable we would output CQL rows to a stream/socket/etc.  
> This would be similar to a CompactionTask, but would strip out some 
> unnecessary things in there (e.g., some of the indexing, etc). Predicates and 
> projections could also be encapsulated in this new "StreamingCompactionTask", 
> for example.
> Another approach would be an alternate storage format.  For example, we might 
> employ Parquet (just as an example) to store the same data as in the primary 
> Cassandra storage (aka SSTables).  This is akin to Global Indexes (an 
> alternate storage of the same data optimized for a particular query).  Then, 
> Cassandra can choose to leverage this alternate storage for particular CQL 
> queries (e.g., range scans).
> These are just 2 suggestions to get the conversation going.
> One thing to note is that it will be useful to have this storage segregated 
> by token range so that when you extract via these mechanisms you do not get 
> replications-factor numbers of copies of the data.  That will certainly be an 
> issue for some Spark operations (e.g., counting).  Thus, we will want 
> per-token-range storage (even for single disks), so this will likely leverage 
> CASSANDRA-6696 (though, we'll want to also consider the single disk case).
> It is also worth discussing what the success criteria is here.  It is 
> unlikely to be as fast as EDW or HDFS performance (though, that is still a 
> good goal), but being within some percentage of that performance should be 
> set as success.  For example, 2x as long as doing bulk operations on HDFS 
> with similar node count/size/etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-9259) Bulk Reading from Cassandra

2016-03-23 Thread Stefania (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-9259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15209656#comment-15209656
 ] 

Stefania commented on CASSANDRA-9259:
-

I would like to make sure we are all on the same page and gather any further 
suggestions or thoughts. It would also help very much to have an indication of 
what we are aiming for, in terms of performance. I am not familiar with either 
EDW or HDFS, so it would be helpful to roughly quantify what's considered good.

We can optimize the existing read path by special-casing local reads at CL=1 
and by optimizing any other areas highlighted by the analysis above. The 
underlying classes for reading sstables are the same as those used for 
compaction, so this is a similar approach to the "streaming compaction" 
approach mentioned above, provided we can replace CQL paging with streaming. To 
do this, we can add some options to the {{SELECT}} CQL command. I'm not sure if 
it's required at all, but we can improve or change the CQL rows output format 
if needed. The {{SELECT}} options would allow changing both transfer mechanism 
and output format, if required.

In addition to streaming, if local clients are important, we could also 
consider saving the output to a memory mapped file, or replacing sockets with 
Unix domain pipes, if tests indicate that these are significantly faster 
delivery mechanisms for local clients. 

Some alternatives:

* An off-line tool for scanning sstables. This is in principle simple and 
efficient but it has the disadvantage that it is a new tool, and we already 
have many tools. Because it requires access to sstables, it would not work very 
well for clients running on a different host.
* A JMX/nodetool command, as above, this is again something new and in order to 
implement some remote communication, it would require new plumbing, unless we 
just save data to a file, which again would not be very friendly for remote 
clients.
* A new CQL command, something like {{BULK EXPORT}}. This is very similar to 
the approach described above. It is more work for the drivers however, and the 
functionality is similar to {{SELECT}}, except for delivery and _maybe_ output 
format. I would probably consider this approach, if we are positive that we 
need a new output format.

[~brianmhess], [~rspitzer], [~slebresne] any thoughts?

> Bulk Reading from Cassandra
> ---
>
> Key: CASSANDRA-9259
> URL: https://issues.apache.org/jira/browse/CASSANDRA-9259
> Project: Cassandra
>  Issue Type: New Feature
>  Components: Compaction, CQL, Local Write-Read Paths, Streaming and 
> Messaging, Testing
>Reporter:  Brian Hess
>Assignee: Stefania
>Priority: Critical
> Fix For: 3.x
>
> Attachments: bulk-read-benchmark.1.html, 
> bulk-read-jfr-profiles.1.tar.gz, bulk-read-jfr-profiles.2.tar.gz
>
>
> This ticket is following on from the 2015 NGCC.  This ticket is designed to 
> be a place for discussing and designing an approach to bulk reading.
> The goal is to have a bulk reading path for Cassandra.  That is, a path 
> optimized to grab a large portion of the data for a table (potentially all of 
> it).  This is a core element in the Spark integration with Cassandra, and the 
> speed at which Cassandra can deliver bulk data to Spark is limiting the 
> performance of Spark-plus-Cassandra operations.  This is especially of 
> importance as Cassandra will (likely) leverage Spark for internal operations 
> (for example CASSANDRA-8234).
> The core CQL to consider is the following:
> SELECT a, b, c FROM myKs.myTable WHERE Token(partitionKey) > X AND 
> Token(partitionKey) <= Y
> Here, we choose X and Y to be contained within one token range (perhaps 
> considering the primary range of a node without vnodes, for example).  This 
> query pushes 50K-100K rows/sec, which is not very fast if we are doing bulk 
> operations via Spark (or other processing frameworks - ETL, etc).  There are 
> a few causes (e.g., inefficient paging).
> There are a few approaches that could be considered.  First, we consider a 
> new "Streaming Compaction" approach.  The key observation here is that a bulk 
> read from Cassandra is a lot like a major compaction, though instead of 
> outputting a new SSTable we would output CQL rows to a stream/socket/etc.  
> This would be similar to a CompactionTask, but would strip out some 
> unnecessary things in there (e.g., some of the indexing, etc). Predicates and 
> projections could also be encapsulated in this new "StreamingCompactionTask", 
> for example.
> Another approach would be an alternate storage format.  For example, we might 
> employ Parquet (just as an example) to store the same data as in the primary 
> Cassandra storage (aka SSTables).  This is akin to Global Indexes (an 
> alternate storage of the same data 

[jira] [Commented] (CASSANDRA-9259) Bulk Reading from Cassandra

2016-03-19 Thread Stefania (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-9259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15198628#comment-15198628
 ] 

Stefania commented on CASSANDRA-9259:
-

Thanks! 

100k rows/second looks like an upper-bound but my guess would be that it is 
related to CQL encoding, not so much to how fast we decode data from sstables. 
For the sake of completeness, we can push more than 100k rows/second by adding 
more threads to cassandra stress. I'll make sure to add a test case with more 
columns and one with an even smaller partition to the next set of 
_cassandra-stress_ tests, so we'll know for sure. It may be advantageous to 
also have micro benchmarks for decoding sstables and for other parts that 
become relevant to the final solution that we choose.

> Bulk Reading from Cassandra
> ---
>
> Key: CASSANDRA-9259
> URL: https://issues.apache.org/jira/browse/CASSANDRA-9259
> Project: Cassandra
>  Issue Type: New Feature
>  Components: Compaction, CQL, Local Write-Read Paths, Streaming and 
> Messaging, Testing
>Reporter:  Brian Hess
>Assignee: Stefania
>Priority: Critical
> Fix For: 3.x
>
> Attachments: bulk-read-benchmark.1.html, 
> bulk-read-jfr-profiles.1.tar.gz, bulk-read-jfr-profiles.2.tar.gz
>
>
> This ticket is following on from the 2015 NGCC.  This ticket is designed to 
> be a place for discussing and designing an approach to bulk reading.
> The goal is to have a bulk reading path for Cassandra.  That is, a path 
> optimized to grab a large portion of the data for a table (potentially all of 
> it).  This is a core element in the Spark integration with Cassandra, and the 
> speed at which Cassandra can deliver bulk data to Spark is limiting the 
> performance of Spark-plus-Cassandra operations.  This is especially of 
> importance as Cassandra will (likely) leverage Spark for internal operations 
> (for example CASSANDRA-8234).
> The core CQL to consider is the following:
> SELECT a, b, c FROM myKs.myTable WHERE Token(partitionKey) > X AND 
> Token(partitionKey) <= Y
> Here, we choose X and Y to be contained within one token range (perhaps 
> considering the primary range of a node without vnodes, for example).  This 
> query pushes 50K-100K rows/sec, which is not very fast if we are doing bulk 
> operations via Spark (or other processing frameworks - ETL, etc).  There are 
> a few causes (e.g., inefficient paging).
> There are a few approaches that could be considered.  First, we consider a 
> new "Streaming Compaction" approach.  The key observation here is that a bulk 
> read from Cassandra is a lot like a major compaction, though instead of 
> outputting a new SSTable we would output CQL rows to a stream/socket/etc.  
> This would be similar to a CompactionTask, but would strip out some 
> unnecessary things in there (e.g., some of the indexing, etc). Predicates and 
> projections could also be encapsulated in this new "StreamingCompactionTask", 
> for example.
> Another approach would be an alternate storage format.  For example, we might 
> employ Parquet (just as an example) to store the same data as in the primary 
> Cassandra storage (aka SSTables).  This is akin to Global Indexes (an 
> alternate storage of the same data optimized for a particular query).  Then, 
> Cassandra can choose to leverage this alternate storage for particular CQL 
> queries (e.g., range scans).
> These are just 2 suggestions to get the conversation going.
> One thing to note is that it will be useful to have this storage segregated 
> by token range so that when you extract via these mechanisms you do not get 
> replications-factor numbers of copies of the data.  That will certainly be an 
> issue for some Spark operations (e.g., counting).  Thus, we will want 
> per-token-range storage (even for single disks), so this will likely leverage 
> CASSANDRA-6696 (though, we'll want to also consider the single disk case).
> It is also worth discussing what the success criteria is here.  It is 
> unlikely to be as fast as EDW or HDFS performance (though, that is still a 
> good goal), but being within some percentage of that performance should be 
> set as success.  For example, 2x as long as doing bulk operations on HDFS 
> with similar node count/size/etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-9259) Bulk Reading from Cassandra

2016-03-19 Thread Vassil Lunchev (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-9259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15197725#comment-15197725
 ] 

Vassil Lunchev commented on CASSANDRA-9259:
---

Very good results!

>From the benchmarks it seems like 100k rows/second is something like a limit. 
>I have seen that limit in tests as well and to me it is more like 100k 
>cells/second. 
Do you think Cassandra would be able to push more than 100k rows/second with 
partition sizes smaller than 100-bytes (I know that is unpractical)?
Also do you think adding more columns to the rows will have any effect. Like, 
do you think the bound is around 100k rows/second or around 100k cells/second?

If I have to bet, it would still be around 100k per second even with smaller 
than 100 bytes partitions, and the bottleneck is the number of cells, not the 
number of rows.

> Bulk Reading from Cassandra
> ---
>
> Key: CASSANDRA-9259
> URL: https://issues.apache.org/jira/browse/CASSANDRA-9259
> Project: Cassandra
>  Issue Type: New Feature
>  Components: Compaction, CQL, Local Write-Read Paths, Streaming and 
> Messaging, Testing
>Reporter:  Brian Hess
>Assignee: Stefania
>Priority: Critical
> Fix For: 3.x
>
> Attachments: bulk-read-benchmark.1.html, 
> bulk-read-jfr-profiles.1.tar.gz, bulk-read-jfr-profiles.2.tar.gz
>
>
> This ticket is following on from the 2015 NGCC.  This ticket is designed to 
> be a place for discussing and designing an approach to bulk reading.
> The goal is to have a bulk reading path for Cassandra.  That is, a path 
> optimized to grab a large portion of the data for a table (potentially all of 
> it).  This is a core element in the Spark integration with Cassandra, and the 
> speed at which Cassandra can deliver bulk data to Spark is limiting the 
> performance of Spark-plus-Cassandra operations.  This is especially of 
> importance as Cassandra will (likely) leverage Spark for internal operations 
> (for example CASSANDRA-8234).
> The core CQL to consider is the following:
> SELECT a, b, c FROM myKs.myTable WHERE Token(partitionKey) > X AND 
> Token(partitionKey) <= Y
> Here, we choose X and Y to be contained within one token range (perhaps 
> considering the primary range of a node without vnodes, for example).  This 
> query pushes 50K-100K rows/sec, which is not very fast if we are doing bulk 
> operations via Spark (or other processing frameworks - ETL, etc).  There are 
> a few causes (e.g., inefficient paging).
> There are a few approaches that could be considered.  First, we consider a 
> new "Streaming Compaction" approach.  The key observation here is that a bulk 
> read from Cassandra is a lot like a major compaction, though instead of 
> outputting a new SSTable we would output CQL rows to a stream/socket/etc.  
> This would be similar to a CompactionTask, but would strip out some 
> unnecessary things in there (e.g., some of the indexing, etc). Predicates and 
> projections could also be encapsulated in this new "StreamingCompactionTask", 
> for example.
> Another approach would be an alternate storage format.  For example, we might 
> employ Parquet (just as an example) to store the same data as in the primary 
> Cassandra storage (aka SSTables).  This is akin to Global Indexes (an 
> alternate storage of the same data optimized for a particular query).  Then, 
> Cassandra can choose to leverage this alternate storage for particular CQL 
> queries (e.g., range scans).
> These are just 2 suggestions to get the conversation going.
> One thing to note is that it will be useful to have this storage segregated 
> by token range so that when you extract via these mechanisms you do not get 
> replications-factor numbers of copies of the data.  That will certainly be an 
> issue for some Spark operations (e.g., counting).  Thus, we will want 
> per-token-range storage (even for single disks), so this will likely leverage 
> CASSANDRA-6696 (though, we'll want to also consider the single disk case).
> It is also worth discussing what the success criteria is here.  It is 
> unlikely to be as fast as EDW or HDFS performance (though, that is still a 
> good goal), but being within some percentage of that performance should be 
> set as success.  For example, 2x as long as doing bulk operations on HDFS 
> with similar node count/size/etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-9259) Bulk Reading from Cassandra

2016-03-18 Thread Stefania (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-9259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15197232#comment-15197232
 ] 

Stefania commented on CASSANDRA-9259:
-


Below are the results of a set of benchmarking measurements that were performed 
using the _cassandra-stress_ token range queries as delivered by 
CASSANDRA-10331. A token range query is a query as outlined in the description 
of this ticket: {{SELECT a, b, c FROM myKs.myTable WHERE Token(partitionKey) > 
X AND Token(partitionKey) <= Y}}.

These are suggested code enhancements that follow from the analysis of Java 
Flight Recorder profiles taken during these measurements.

h5. Optimizations to the current read path

* Avoid using heap buffers in {{DataOutputBuffer}}, ideally we should use the 
buffer pool and the buffer pool should be extended to support sizes larger than 
64 kbytes (point 4 of CASSANDRA-9468).

* Avoid creating a local response in {{LocalReadRunnable}}, which serializes 
the partition iterator in the READ stage thread only for it to be deserialized 
again in the storage proxy thread. I realize this is problematic because we 
need to guard sstables and memtables via a {{ReadExecutionController}}, as well 
as guarantee iterator safety, but removing this redundant step should help 
significantly for large analytics workloads where the client queries data 
locally. Perhaps this should be done together with a new output format 
optimized for this workflow, as described further below. Any thoughts 
[~slebresne]?

* Improve access to sstables for local token ranges in order to avoid the 
binary search on the index summary, and scanning of index entries until the 
range is found.

* Increase parallelism by splitting local token ranges; at the moment there is 
parallelism in storage proxy {{getRangeSlice()}} but ranges are only split by 
replica, a local range is not divided into smaller ranges. Splitting a local 
range into sub-ranges as stored in different sstables should increase 
performance.

h5. Extensive re-alignment of format and transfer mechanism

In addition to optimizing the current read path, to avoid the cost of encoding 
CQL rows, we could replace them with a format that is more analytics friendly. 
This would be similar to a special compaction task outputting a columnar format 
optimized for the query as suggested above. This format should allow efficient 
decoding in the client and its data should be transferred from Cassandra to the 
client as quickly as possible. If the client runs locally, for example we could 
use one of the following mechanisms:

* shared memory; 
* Unix domain pipes;
* another [fast IPC mechanisms|https://lwn.net/Articles/466304/]. 

We should probably leverage either [Apache Parquet|https://parquet.apache.org] 
for the format or [Apache arrow|https://arrow.apache.org] for both format and 
transfer mechanism. I think the latter is more aligned to what we are trying to 
do, but unfortunately this project is just starting. As far as I understand, 
eventually Spark will read Arrow memory and so if we wrote Arrow memory the 
transfer should become extremely efficient.

h5. Benchmark setup

Each _cassandra-stress_ operation corresponds to the retrieval of one page 
using a token range query where all columns are retrieved. Each token range 
corresponds to a VNODE range and the Cassandra host was configured with 256 
ranges. Different partition sizes were tested: 100 bytes, 1 kbyte, 10 kbytes, 
100 kbytes and 1 MB. Each partition had a single CQL row; for the 100 kbytes 
partition size however, two more tests were added, with 100 and 1000 CQL rows 
respectively. The size of the rows was adjusted so that the partition size 
stayed at approximately 100 kbytes. 

The page size for each test was chosen so that the amount of data downloaded in 
each operation was roughly 1 MB. So we can view the _cassandra-stress_ ops / 
second as roughly MB / seconds. A single _cassandra-stress_ thread was used.

The tests were run on a GCE *n1-standard-8* VM: 4 Intel Xeon @ 2.60GHz physical 
cores with 2 hyper-threads per core - all cores located on a single socket, 29 
GB of Memory and a 1073GB SCSI hard disk. Both _cassandra-stress_ and the 
Cassandra process were running on the same VM, as I understand this is normally 
how Spark or other analytics tools are deployed.

h5. Benchmark results

The following is a summary of the results in ops / second, 
[^bulk-read-benchmark.1.html] attached also contains diagrams, 
partitions/second and rows/second.

||100-bytes||1-kbyte||10-kbytes||100-kbytes||100-kbytes-100-rows||100-kbytes-1000-rows||1-MB||
|28|59|101|80|50|10|73|

The performance is variable and very much dependent on partition size and 
number of CQL rows in a partition. Java Flight Recorder (JFR) files for all 
cases, albeit measured with a smaller sample, have also been attached, see 
[^bulk-read-jfr-profiles.1.tar.gz] and 

[jira] [Commented] (CASSANDRA-9259) Bulk Reading from Cassandra

2016-01-26 Thread Vassil Lunchev (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-9259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15117397#comment-15117397
 ] 

Vassil Lunchev commented on CASSANDRA-9259:
---

"For full data queries it may be advantageous to have C* be able to compact all 
of the relevant sstables into a format friendlier to analytics workloads."
I would even go further and say - "Cassandra needs a new compaction strategy. 
It has DateTieredCompactionStrategy for time series data. It needs a new one, 
for example ColumnarCompactionStrategy, that is similar in concept to Parquet 
and designed for analytics workloads."

The results here: https://github.com/velvia/cassandra-gdelt
and the ideas here: https://github.com/tuplejump/FiloDB
are very compelling. FilloDB is practically doing a new columnar compaction 
layer on top of C*. And the results are quite promising - "faster than Parquet 
scan speeds" with storage needs "within 35% of Parquet".

> Bulk Reading from Cassandra
> ---
>
> Key: CASSANDRA-9259
> URL: https://issues.apache.org/jira/browse/CASSANDRA-9259
> Project: Cassandra
>  Issue Type: New Feature
>  Components: Compaction, CQL, Local Write-Read Paths, Streaming and 
> Messaging, Testing
>Reporter:  Brian Hess
>Priority: Critical
> Fix For: 3.x
>
>
> This ticket is following on from the 2015 NGCC.  This ticket is designed to 
> be a place for discussing and designing an approach to bulk reading.
> The goal is to have a bulk reading path for Cassandra.  That is, a path 
> optimized to grab a large portion of the data for a table (potentially all of 
> it).  This is a core element in the Spark integration with Cassandra, and the 
> speed at which Cassandra can deliver bulk data to Spark is limiting the 
> performance of Spark-plus-Cassandra operations.  This is especially of 
> importance as Cassandra will (likely) leverage Spark for internal operations 
> (for example CASSANDRA-8234).
> The core CQL to consider is the following:
> SELECT a, b, c FROM myKs.myTable WHERE Token(partitionKey) > X AND 
> Token(partitionKey) <= Y
> Here, we choose X and Y to be contained within one token range (perhaps 
> considering the primary range of a node without vnodes, for example).  This 
> query pushes 50K-100K rows/sec, which is not very fast if we are doing bulk 
> operations via Spark (or other processing frameworks - ETL, etc).  There are 
> a few causes (e.g., inefficient paging).
> There are a few approaches that could be considered.  First, we consider a 
> new "Streaming Compaction" approach.  The key observation here is that a bulk 
> read from Cassandra is a lot like a major compaction, though instead of 
> outputting a new SSTable we would output CQL rows to a stream/socket/etc.  
> This would be similar to a CompactionTask, but would strip out some 
> unnecessary things in there (e.g., some of the indexing, etc). Predicates and 
> projections could also be encapsulated in this new "StreamingCompactionTask", 
> for example.
> Another approach would be an alternate storage format.  For example, we might 
> employ Parquet (just as an example) to store the same data as in the primary 
> Cassandra storage (aka SSTables).  This is akin to Global Indexes (an 
> alternate storage of the same data optimized for a particular query).  Then, 
> Cassandra can choose to leverage this alternate storage for particular CQL 
> queries (e.g., range scans).
> These are just 2 suggestions to get the conversation going.
> One thing to note is that it will be useful to have this storage segregated 
> by token range so that when you extract via these mechanisms you do not get 
> replications-factor numbers of copies of the data.  That will certainly be an 
> issue for some Spark operations (e.g., counting).  Thus, we will want 
> per-token-range storage (even for single disks), so this will likely leverage 
> CASSANDRA-6696 (though, we'll want to also consider the single disk case).
> It is also worth discussing what the success criteria is here.  It is 
> unlikely to be as fast as EDW or HDFS performance (though, that is still a 
> good goal), but being within some percentage of that performance should be 
> set as success.  For example, 2x as long as doing bulk operations on HDFS 
> with similar node count/size/etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-9259) Bulk Reading from Cassandra

2015-08-26 Thread Evan Volgas (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-9259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14715707#comment-14715707
 ] 

Evan Volgas commented on CASSANDRA-9259:


For full data queries it may be advantageous to have C* be able to compact all 
of the relevant sstables into a format friendlier to analytics workloads.  --- 
I cannot agree with [~rspitzer] enough on this point. I tried running Cassandra 
with Presto recently on a handful of r3.2xlarges... even a simple count(*) of a 
few million rows took 15-20 minutes, most of which was spent deserializing the 
SSTable for Presto to read it (and that's to say nothing of the insanely high 
timeout settings that were required for it to work). If you could compact your 
data to something like Avro/Parquet, you open the doors to a tremendous variety 
of analytical / ad hoc / exploratory work loads. I would be nothing short of 
delighted if this were possible in Cassandra. 

 Bulk Reading from Cassandra
 ---

 Key: CASSANDRA-9259
 URL: https://issues.apache.org/jira/browse/CASSANDRA-9259
 Project: Cassandra
  Issue Type: New Feature
  Components: Core
Reporter:  Brian Hess
Assignee: Ariel Weisberg
Priority: Critical
 Fix For: 3.x


 This ticket is following on from the 2015 NGCC.  This ticket is designed to 
 be a place for discussing and designing an approach to bulk reading.
 The goal is to have a bulk reading path for Cassandra.  That is, a path 
 optimized to grab a large portion of the data for a table (potentially all of 
 it).  This is a core element in the Spark integration with Cassandra, and the 
 speed at which Cassandra can deliver bulk data to Spark is limiting the 
 performance of Spark-plus-Cassandra operations.  This is especially of 
 importance as Cassandra will (likely) leverage Spark for internal operations 
 (for example CASSANDRA-8234).
 The core CQL to consider is the following:
 SELECT a, b, c FROM myKs.myTable WHERE Token(partitionKey)  X AND 
 Token(partitionKey) = Y
 Here, we choose X and Y to be contained within one token range (perhaps 
 considering the primary range of a node without vnodes, for example).  This 
 query pushes 50K-100K rows/sec, which is not very fast if we are doing bulk 
 operations via Spark (or other processing frameworks - ETL, etc).  There are 
 a few causes (e.g., inefficient paging).
 There are a few approaches that could be considered.  First, we consider a 
 new Streaming Compaction approach.  The key observation here is that a bulk 
 read from Cassandra is a lot like a major compaction, though instead of 
 outputting a new SSTable we would output CQL rows to a stream/socket/etc.  
 This would be similar to a CompactionTask, but would strip out some 
 unnecessary things in there (e.g., some of the indexing, etc). Predicates and 
 projections could also be encapsulated in this new StreamingCompactionTask, 
 for example.
 Another approach would be an alternate storage format.  For example, we might 
 employ Parquet (just as an example) to store the same data as in the primary 
 Cassandra storage (aka SSTables).  This is akin to Global Indexes (an 
 alternate storage of the same data optimized for a particular query).  Then, 
 Cassandra can choose to leverage this alternate storage for particular CQL 
 queries (e.g., range scans).
 These are just 2 suggestions to get the conversation going.
 One thing to note is that it will be useful to have this storage segregated 
 by token range so that when you extract via these mechanisms you do not get 
 replications-factor numbers of copies of the data.  That will certainly be an 
 issue for some Spark operations (e.g., counting).  Thus, we will want 
 per-token-range storage (even for single disks), so this will likely leverage 
 CASSANDRA-6696 (though, we'll want to also consider the single disk case).
 It is also worth discussing what the success criteria is here.  It is 
 unlikely to be as fast as EDW or HDFS performance (though, that is still a 
 good goal), but being within some percentage of that performance should be 
 set as success.  For example, 2x as long as doing bulk operations on HDFS 
 with similar node count/size/etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-9259) Bulk Reading from Cassandra

2015-07-25 Thread Benedict (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-9259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14641721#comment-14641721
 ] 

Benedict commented on CASSANDRA-9259:
-

FTR, I very much favour the streaming compaction approach. Compaction should be 
just about our most optimised code path. If we cannot make it fast enough, 
nothing will be. If it isn't currently fast enough, we should make it faster.

CASSANDRA-8630 and CASSANDRA-9500 are both related.

 Bulk Reading from Cassandra
 ---

 Key: CASSANDRA-9259
 URL: https://issues.apache.org/jira/browse/CASSANDRA-9259
 Project: Cassandra
  Issue Type: Improvement
  Components: Core
Reporter:  Brian Hess
Assignee: Ariel Weisberg

 This ticket is following on from the 2015 NGCC.  This ticket is designed to 
 be a place for discussing and designing an approach to bulk reading.
 The goal is to have a bulk reading path for Cassandra.  That is, a path 
 optimized to grab a large portion of the data for a table (potentially all of 
 it).  This is a core element in the Spark integration with Cassandra, and the 
 speed at which Cassandra can deliver bulk data to Spark is limiting the 
 performance of Spark-plus-Cassandra operations.  This is especially of 
 importance as Cassandra will (likely) leverage Spark for internal operations 
 (for example CASSANDRA-8234).
 The core CQL to consider is the following:
 SELECT a, b, c FROM myKs.myTable WHERE Token(partitionKey)  X AND 
 Token(partitionKey) = Y
 Here, we choose X and Y to be contained within one token range (perhaps 
 considering the primary range of a node without vnodes, for example).  This 
 query pushes 50K-100K rows/sec, which is not very fast if we are doing bulk 
 operations via Spark (or other processing frameworks - ETL, etc).  There are 
 a few causes (e.g., inefficient paging).
 There are a few approaches that could be considered.  First, we consider a 
 new Streaming Compaction approach.  The key observation here is that a bulk 
 read from Cassandra is a lot like a major compaction, though instead of 
 outputting a new SSTable we would output CQL rows to a stream/socket/etc.  
 This would be similar to a CompactionTask, but would strip out some 
 unnecessary things in there (e.g., some of the indexing, etc). Predicates and 
 projections could also be encapsulated in this new StreamingCompactionTask, 
 for example.
 Another approach would be an alternate storage format.  For example, we might 
 employ Parquet (just as an example) to store the same data as in the primary 
 Cassandra storage (aka SSTables).  This is akin to Global Indexes (an 
 alternate storage of the same data optimized for a particular query).  Then, 
 Cassandra can choose to leverage this alternate storage for particular CQL 
 queries (e.g., range scans).
 These are just 2 suggestions to get the conversation going.
 One thing to note is that it will be useful to have this storage segregated 
 by token range so that when you extract via these mechanisms you do not get 
 replications-factor numbers of copies of the data.  That will certainly be an 
 issue for some Spark operations (e.g., counting).  Thus, we will want 
 per-token-range storage (even for single disks), so this will likely leverage 
 CASSANDRA-6696 (though, we'll want to also consider the single disk case).
 It is also worth discussing what the success criteria is here.  It is 
 unlikely to be as fast as EDW or HDFS performance (though, that is still a 
 good goal), but being within some percentage of that performance should be 
 set as success.  For example, 2x as long as doing bulk operations on HDFS 
 with similar node count/size/etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-9259) Bulk Reading from Cassandra

2015-05-12 Thread Russell Alexander Spitzer (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-9259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14540321#comment-14540321
 ] 

Russell Alexander Spitzer commented on CASSANDRA-9259:
--

For full data queries it may be advantageous to have C* be able to compact all 
of the relevant sstables into a format friendlier to analytics workloads. For 
example a {{compactToParquet}} command could be quite useful for doing batch 
analytics. 

This would be useful for full table work because:
We are going to need to write the data to a new storage format (or spill 
serialized data to disk anyway)
A durable copy of the data in the new format will stop us from worrying about 
losing spilled blocks during execution (with spark executors dying and their 
block stores being lost)


 Bulk Reading from Cassandra
 ---

 Key: CASSANDRA-9259
 URL: https://issues.apache.org/jira/browse/CASSANDRA-9259
 Project: Cassandra
  Issue Type: Improvement
  Components: Core
Reporter:  Brian Hess

 This ticket is following on from the 2015 NGCC.  This ticket is designed to 
 be a place for discussing and designing an approach to bulk reading.
 The goal is to have a bulk reading path for Cassandra.  That is, a path 
 optimized to grab a large portion of the data for a table (potentially all of 
 it).  This is a core element in the Spark integration with Cassandra, and the 
 speed at which Cassandra can deliver bulk data to Spark is limiting the 
 performance of Spark-plus-Cassandra operations.  This is especially of 
 importance as Cassandra will (likely) leverage Spark for internal operations 
 (for example CASSANDRA-8234).
 The core CQL to consider is the following:
 SELECT a, b, c FROM myKs.myTable WHERE Token(partitionKey)  X AND 
 Token(partitionKey) = Y
 Here, we choose X and Y to be contained within one token range (perhaps 
 considering the primary range of a node without vnodes, for example).  This 
 query pushes 50K-100K rows/sec, which is not very fast if we are doing bulk 
 operations via Spark (or other processing frameworks - ETL, etc).  There are 
 a few causes (e.g., inefficient paging).
 There are a few approaches that could be considered.  First, we consider a 
 new Streaming Compaction approach.  The key observation here is that a bulk 
 read from Cassandra is a lot like a major compaction, though instead of 
 outputting a new SSTable we would output CQL rows to a stream/socket/etc.  
 This would be similar to a CompactionTask, but would strip out some 
 unnecessary things in there (e.g., some of the indexing, etc). Predicates and 
 projections could also be encapsulated in this new StreamingCompactionTask, 
 for example.
 Another approach would be an alternate storage format.  For example, we might 
 employ Parquet (just as an example) to store the same data as in the primary 
 Cassandra storage (aka SSTables).  This is akin to Global Indexes (an 
 alternate storage of the same data optimized for a particular query).  Then, 
 Cassandra can choose to leverage this alternate storage for particular CQL 
 queries (e.g., range scans).
 These are just 2 suggestions to get the conversation going.
 One thing to note is that it will be useful to have this storage segregated 
 by token range so that when you extract via these mechanisms you do not get 
 replications-factor numbers of copies of the data.  That will certainly be an 
 issue for some Spark operations (e.g., counting).  Thus, we will want 
 per-token-range storage (even for single disks), so this will likely leverage 
 CASSANDRA-6696 (though, we'll want to also consider the single disk case).
 It is also worth discussing what the success criteria is here.  It is 
 unlikely to be as fast as EDW or HDFS performance (though, that is still a 
 good goal), but being within some percentage of that performance should be 
 set as success.  For example, 2x as long as doing bulk operations on HDFS 
 with similar node count/size/etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-9259) Bulk Reading from Cassandra

2015-05-12 Thread Jeremy Hanna (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-9259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14541150#comment-14541150
 ] 

Jeremy Hanna commented on CASSANDRA-9259:
-

With compaction being single threaded, perhaps we could parallelize to N 
extract files where N is the number of concurrent compactors configured. That 
way you have a small number of files but are using multiple cpu cores to make 
better use of the IO.

 Bulk Reading from Cassandra
 ---

 Key: CASSANDRA-9259
 URL: https://issues.apache.org/jira/browse/CASSANDRA-9259
 Project: Cassandra
  Issue Type: Improvement
  Components: Core
Reporter:  Brian Hess

 This ticket is following on from the 2015 NGCC.  This ticket is designed to 
 be a place for discussing and designing an approach to bulk reading.
 The goal is to have a bulk reading path for Cassandra.  That is, a path 
 optimized to grab a large portion of the data for a table (potentially all of 
 it).  This is a core element in the Spark integration with Cassandra, and the 
 speed at which Cassandra can deliver bulk data to Spark is limiting the 
 performance of Spark-plus-Cassandra operations.  This is especially of 
 importance as Cassandra will (likely) leverage Spark for internal operations 
 (for example CASSANDRA-8234).
 The core CQL to consider is the following:
 SELECT a, b, c FROM myKs.myTable WHERE Token(partitionKey)  X AND 
 Token(partitionKey) = Y
 Here, we choose X and Y to be contained within one token range (perhaps 
 considering the primary range of a node without vnodes, for example).  This 
 query pushes 50K-100K rows/sec, which is not very fast if we are doing bulk 
 operations via Spark (or other processing frameworks - ETL, etc).  There are 
 a few causes (e.g., inefficient paging).
 There are a few approaches that could be considered.  First, we consider a 
 new Streaming Compaction approach.  The key observation here is that a bulk 
 read from Cassandra is a lot like a major compaction, though instead of 
 outputting a new SSTable we would output CQL rows to a stream/socket/etc.  
 This would be similar to a CompactionTask, but would strip out some 
 unnecessary things in there (e.g., some of the indexing, etc). Predicates and 
 projections could also be encapsulated in this new StreamingCompactionTask, 
 for example.
 Another approach would be an alternate storage format.  For example, we might 
 employ Parquet (just as an example) to store the same data as in the primary 
 Cassandra storage (aka SSTables).  This is akin to Global Indexes (an 
 alternate storage of the same data optimized for a particular query).  Then, 
 Cassandra can choose to leverage this alternate storage for particular CQL 
 queries (e.g., range scans).
 These are just 2 suggestions to get the conversation going.
 One thing to note is that it will be useful to have this storage segregated 
 by token range so that when you extract via these mechanisms you do not get 
 replications-factor numbers of copies of the data.  That will certainly be an 
 issue for some Spark operations (e.g., counting).  Thus, we will want 
 per-token-range storage (even for single disks), so this will likely leverage 
 CASSANDRA-6696 (though, we'll want to also consider the single disk case).
 It is also worth discussing what the success criteria is here.  It is 
 unlikely to be as fast as EDW or HDFS performance (though, that is still a 
 good goal), but being within some percentage of that performance should be 
 set as success.  For example, 2x as long as doing bulk operations on HDFS 
 with similar node count/size/etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-9259) Bulk Reading from Cassandra

2015-05-05 Thread Jonathan Ellis (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-9259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14529530#comment-14529530
 ] 

Jonathan Ellis commented on CASSANDRA-9259:
---

Also, CASSANDRA-8630

 Bulk Reading from Cassandra
 ---

 Key: CASSANDRA-9259
 URL: https://issues.apache.org/jira/browse/CASSANDRA-9259
 Project: Cassandra
  Issue Type: Improvement
  Components: Core
Reporter:  Brian Hess

 This ticket is following on from the 2015 NGCC.  This ticket is designed to 
 be a place for discussing and designing an approach to bulk reading.
 The goal is to have a bulk reading path for Cassandra.  That is, a path 
 optimized to grab a large portion of the data for a table (potentially all of 
 it).  This is a core element in the Spark integration with Cassandra, and the 
 speed at which Cassandra can deliver bulk data to Spark is limiting the 
 performance of Spark-plus-Cassandra operations.  This is especially of 
 importance as Cassandra will (likely) leverage Spark for internal operations 
 (for example CASSANDRA-8234).
 The core CQL to consider is the following:
 SELECT a, b, c FROM myKs.myTable WHERE Token(partitionKey)  X AND 
 Token(partitionKey) = Y
 Here, we choose X and Y to be contained within one token range (perhaps 
 considering the primary range of a node without vnodes, for example).  This 
 query pushes 50K-100K rows/sec, which is not very fast if we are doing bulk 
 operations via Spark (or other processing frameworks - ETL, etc).  There are 
 a few causes (e.g., inefficient paging).
 There are a few approaches that could be considered.  First, we consider a 
 new Streaming Compaction approach.  The key observation here is that a bulk 
 read from Cassandra is a lot like a major compaction, though instead of 
 outputting a new SSTable we would output CQL rows to a stream/socket/etc.  
 This would be similar to a CompactionTask, but would strip out some 
 unnecessary things in there (e.g., some of the indexing, etc). Predicates and 
 projections could also be encapsulated in this new StreamingCompactionTask, 
 for example.
 Another approach would be an alternate storage format.  For example, we might 
 employ Parquet (just as an example) to store the same data as in the primary 
 Cassandra storage (aka SSTables).  This is akin to Global Indexes (an 
 alternate storage of the same data optimized for a particular query).  Then, 
 Cassandra can choose to leverage this alternate storage for particular CQL 
 queries (e.g., range scans).
 These are just 2 suggestions to get the conversation going.
 One thing to note is that it will be useful to have this storage segregated 
 by token range so that when you extract via these mechanisms you do not get 
 replications-factor numbers of copies of the data.  That will certainly be an 
 issue for some Spark operations (e.g., counting).  Thus, we will want 
 per-token-range storage (even for single disks), so this will likely leverage 
 CASSANDRA-6696 (though, we'll want to also consider the single disk case).
 It is also worth discussing what the success criteria is here.  It is 
 unlikely to be as fast as EDW or HDFS performance (though, that is still a 
 good goal), but being within some percentage of that performance should be 
 set as success.  For example, 2x as long as doing bulk operations on HDFS 
 with similar node count/size/etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-9259) Bulk Reading from Cassandra

2015-04-29 Thread Robert Stupp (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-9259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14520416#comment-14520416
 ] 

Robert Stupp commented on CASSANDRA-9259:
-

Related to CASSANDRA-6167?

 Bulk Reading from Cassandra
 ---

 Key: CASSANDRA-9259
 URL: https://issues.apache.org/jira/browse/CASSANDRA-9259
 Project: Cassandra
  Issue Type: Improvement
  Components: Core
Reporter:  Brian Hess

 This ticket is following on from the 2015 NGCC.  This ticket is designed to 
 be a place for discussing and designing an approach to bulk reading.
 The goal is to have a bulk reading path for Cassandra.  That is, a path 
 optimized to grab a large portion of the data for a table (potentially all of 
 it).  This is a core element in the Spark integration with Cassandra, and the 
 speed at which Cassandra can deliver bulk data to Spark is limiting the 
 performance of Spark-plus-Cassandra operations.  This is especially of 
 importance as Cassandra will (likely) leverage Spark for internal operations 
 (for example CASSANDRA-8234).
 The core CQL to consider is the following:
 SELECT a, b, c FROM myKs.myTable WHERE Token(partitionKey)  X AND 
 Token(partitionKey) = Y
 Here, we choose X and Y to be contained within one token range (perhaps 
 considering the primary range of a node without vnodes, for example).  This 
 query pushes 50K-100K rows/sec, which is not very fast if we are doing bulk 
 operations via Spark (or other processing frameworks - ETL, etc).  There are 
 a few causes (e.g., inefficient paging).
 There are a few approaches that could be considered.  First, we consider a 
 new Streaming Compaction approach.  The key observation here is that a bulk 
 read from Cassandra is a lot like a major compaction, though instead of 
 outputting a new SSTable we would output CQL rows to a stream/socket/etc.  
 This would be similar to a CompactionTask, but would strip out some 
 unnecessary things in there (e.g., some of the indexing, etc). Predicates and 
 projections could also be encapsulated in this new StreamingCompactionTask, 
 for example.
 Another approach would be an alternate storage format.  For example, we might 
 employ Parquet (just as an example) to store the same data as in the primary 
 Cassandra storage (aka SSTables).  This is akin to Global Indexes (an 
 alternate storage of the same data optimized for a particular query).  Then, 
 Cassandra can choose to leverage this alternate storage for particular CQL 
 queries (e.g., range scans).
 These are just 2 suggestions to get the conversation going.
 One thing to note is that it will be useful to have this storage segregated 
 by token range so that when you extract via these mechanisms you do not get 
 replications-factor numbers of copies of the data.  That will certainly be an 
 issue for some Spark operations (e.g., counting).  Thus, we will want 
 per-token-range storage (even for single disks), so this will likely leverage 
 CASSANDRA-6696 (though, we'll want to also consider the single disk case).
 It is also worth discussing what the success criteria is here.  It is 
 unlikely to be as fast as EDW or HDFS performance (though, that is still a 
 good goal), but being within some percentage of that performance should be 
 set as success.  For example, 2x as long as doing bulk operations on HDFS 
 with similar node count/size/etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)