[
https://issues.apache.org/jira/browse/CASSANDRA-9259?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ariel Weisberg updated CASSANDRA-9259:
--------------------------------------
Component/s: Streaming and Messaging
Local Write-Read Paths
CQL
Compaction
> Bulk Reading from Cassandra
> ---------------------------
>
> Key: CASSANDRA-9259
> URL: https://issues.apache.org/jira/browse/CASSANDRA-9259
> Project: Cassandra
> Issue Type: New Feature
> Components: Compaction, CQL, Local Write-Read Paths, Streaming and
> Messaging, Testing
> Reporter: Brian Hess
> Assignee: Ariel Weisberg
> Priority: Critical
> Fix For: 3.x
>
>
> This ticket is following on from the 2015 NGCC. This ticket is designed to
> be a place for discussing and designing an approach to bulk reading.
> The goal is to have a bulk reading path for Cassandra. That is, a path
> optimized to grab a large portion of the data for a table (potentially all of
> it). This is a core element in the Spark integration with Cassandra, and the
> speed at which Cassandra can deliver bulk data to Spark is limiting the
> performance of Spark-plus-Cassandra operations. This is especially of
> importance as Cassandra will (likely) leverage Spark for internal operations
> (for example CASSANDRA-8234).
> The core CQL to consider is the following:
> SELECT a, b, c FROM myKs.myTable WHERE Token(partitionKey) > X AND
> Token(partitionKey) <= Y
> Here, we choose X and Y to be contained within one token range (perhaps
> considering the primary range of a node without vnodes, for example). This
> query pushes 50K-100K rows/sec, which is not very fast if we are doing bulk
> operations via Spark (or other processing frameworks - ETL, etc). There are
> a few causes (e.g., inefficient paging).
> There are a few approaches that could be considered. First, we consider a
> new "Streaming Compaction" approach. The key observation here is that a bulk
> read from Cassandra is a lot like a major compaction, though instead of
> outputting a new SSTable we would output CQL rows to a stream/socket/etc.
> This would be similar to a CompactionTask, but would strip out some
> unnecessary things in there (e.g., some of the indexing, etc). Predicates and
> projections could also be encapsulated in this new "StreamingCompactionTask",
> for example.
> Another approach would be an alternate storage format. For example, we might
> employ Parquet (just as an example) to store the same data as in the primary
> Cassandra storage (aka SSTables). This is akin to Global Indexes (an
> alternate storage of the same data optimized for a particular query). Then,
> Cassandra can choose to leverage this alternate storage for particular CQL
> queries (e.g., range scans).
> These are just 2 suggestions to get the conversation going.
> One thing to note is that it will be useful to have this storage segregated
> by token range so that when you extract via these mechanisms you do not get
> replications-factor numbers of copies of the data. That will certainly be an
> issue for some Spark operations (e.g., counting). Thus, we will want
> per-token-range storage (even for single disks), so this will likely leverage
> CASSANDRA-6696 (though, we'll want to also consider the single disk case).
> It is also worth discussing what the success criteria is here. It is
> unlikely to be as fast as EDW or HDFS performance (though, that is still a
> good goal), but being within some percentage of that performance should be
> set as success. For example, 2x as long as doing bulk operations on HDFS
> with similar node count/size/etc.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)