[ https://issues.apache.org/jira/browse/CASSANDRA-9259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15209915#comment-15209915 ]
Evan Chan commented on CASSANDRA-9259: -------------------------------------- The biggest difference between HDFS and local client reads, say by scanning SSTables directly or going through mmap / domain sockets / other faster local-only mechanism, is consistency. We're not guaranteed that any one of N replicas has the exact same SSTables for any moment in time, are we? Thus, different Spark workers would end up with potentially different data, for the same token range -- essentially CL-1 behavior, which seems not really acceptable to folks used to precision analytics. > Bulk Reading from Cassandra > --------------------------- > > Key: CASSANDRA-9259 > URL: https://issues.apache.org/jira/browse/CASSANDRA-9259 > Project: Cassandra > Issue Type: New Feature > Components: Compaction, CQL, Local Write-Read Paths, Streaming and > Messaging, Testing > Reporter: Brian Hess > Assignee: Stefania > Priority: Critical > Fix For: 3.x > > Attachments: bulk-read-benchmark.1.html, > bulk-read-jfr-profiles.1.tar.gz, bulk-read-jfr-profiles.2.tar.gz > > > This ticket is following on from the 2015 NGCC. This ticket is designed to > be a place for discussing and designing an approach to bulk reading. > The goal is to have a bulk reading path for Cassandra. That is, a path > optimized to grab a large portion of the data for a table (potentially all of > it). This is a core element in the Spark integration with Cassandra, and the > speed at which Cassandra can deliver bulk data to Spark is limiting the > performance of Spark-plus-Cassandra operations. This is especially of > importance as Cassandra will (likely) leverage Spark for internal operations > (for example CASSANDRA-8234). > The core CQL to consider is the following: > SELECT a, b, c FROM myKs.myTable WHERE Token(partitionKey) > X AND > Token(partitionKey) <= Y > Here, we choose X and Y to be contained within one token range (perhaps > considering the primary range of a node without vnodes, for example). This > query pushes 50K-100K rows/sec, which is not very fast if we are doing bulk > operations via Spark (or other processing frameworks - ETL, etc). There are > a few causes (e.g., inefficient paging). > There are a few approaches that could be considered. First, we consider a > new "Streaming Compaction" approach. The key observation here is that a bulk > read from Cassandra is a lot like a major compaction, though instead of > outputting a new SSTable we would output CQL rows to a stream/socket/etc. > This would be similar to a CompactionTask, but would strip out some > unnecessary things in there (e.g., some of the indexing, etc). Predicates and > projections could also be encapsulated in this new "StreamingCompactionTask", > for example. > Another approach would be an alternate storage format. For example, we might > employ Parquet (just as an example) to store the same data as in the primary > Cassandra storage (aka SSTables). This is akin to Global Indexes (an > alternate storage of the same data optimized for a particular query). Then, > Cassandra can choose to leverage this alternate storage for particular CQL > queries (e.g., range scans). > These are just 2 suggestions to get the conversation going. > One thing to note is that it will be useful to have this storage segregated > by token range so that when you extract via these mechanisms you do not get > replications-factor numbers of copies of the data. That will certainly be an > issue for some Spark operations (e.g., counting). Thus, we will want > per-token-range storage (even for single disks), so this will likely leverage > CASSANDRA-6696 (though, we'll want to also consider the single disk case). > It is also worth discussing what the success criteria is here. It is > unlikely to be as fast as EDW or HDFS performance (though, that is still a > good goal), but being within some percentage of that performance should be > set as success. For example, 2x as long as doing bulk operations on HDFS > with similar node count/size/etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)