[ 
https://issues.apache.org/jira/browse/CASSANDRA-9259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15197232#comment-15197232
 ] 

Stefania commented on CASSANDRA-9259:
-------------------------------------


Below are the results of a set of benchmarking measurements that were performed 
using the _cassandra-stress_ token range queries as delivered by 
CASSANDRA-10331. A token range query is a query as outlined in the description 
of this ticket: {{SELECT a, b, c FROM myKs.myTable WHERE Token(partitionKey) > 
X AND Token(partitionKey) <= Y}}.

These are suggested code enhancements that follow from the analysis of Java 
Flight Recorder profiles taken during these measurements.

h5. Optimizations to the current read path

* Avoid using heap buffers in {{DataOutputBuffer}}, ideally we should use the 
buffer pool and the buffer pool should be extended to support sizes larger than 
64 kbytes (point 4 of CASSANDRA-9468).

* Avoid creating a local response in {{LocalReadRunnable}}, which serializes 
the partition iterator in the READ stage thread only for it to be deserialized 
again in the storage proxy thread. I realize this is problematic because we 
need to guard sstables and memtables via a {{ReadExecutionController}}, as well 
as guarantee iterator safety, but removing this redundant step should help 
significantly for large analytics workloads where the client queries data 
locally. Perhaps this should be done together with a new output format 
optimized for this workflow, as described further below. Any thoughts 
[~slebresne]?

* Improve access to sstables for local token ranges in order to avoid the 
binary search on the index summary, and scanning of index entries until the 
range is found.

* Increase parallelism by splitting local token ranges; at the moment there is 
parallelism in storage proxy {{getRangeSlice()}} but ranges are only split by 
replica, a local range is not divided into smaller ranges. Splitting a local 
range into sub-ranges as stored in different sstables should increase 
performance.

h5. Extensive re-alignment of format and transfer mechanism

In addition to optimizing the current read path, to avoid the cost of encoding 
CQL rows, we could replace them with a format that is more analytics friendly. 
This would be similar to a special compaction task outputting a columnar format 
optimized for the query as suggested above. This format should allow efficient 
decoding in the client and its data should be transferred from Cassandra to the 
client as quickly as possible. If the client runs locally, for example we could 
use one of the following mechanisms:

* shared memory; 
* Unix domain pipes;
* another [fast IPC mechanisms|https://lwn.net/Articles/466304/]. 

We should probably leverage either [Apache Parquet|https://parquet.apache.org] 
for the format or [Apache arrow|https://arrow.apache.org] for both format and 
transfer mechanism. I think the latter is more aligned to what we are trying to 
do, but unfortunately this project is just starting. As far as I understand, 
eventually Spark will read Arrow memory and so if we wrote Arrow memory the 
transfer should become extremely efficient.

h5. Benchmark setup

Each _cassandra-stress_ operation corresponds to the retrieval of one page 
using a token range query where all columns are retrieved. Each token range 
corresponds to a VNODE range and the Cassandra host was configured with 256 
ranges. Different partition sizes were tested: 100 bytes, 1 kbyte, 10 kbytes, 
100 kbytes and 1 MB. Each partition had a single CQL row; for the 100 kbytes 
partition size however, two more tests were added, with 100 and 1000 CQL rows 
respectively. The size of the rows was adjusted so that the partition size 
stayed at approximately 100 kbytes. 

The page size for each test was chosen so that the amount of data downloaded in 
each operation was roughly 1 MB. So we can view the _cassandra-stress_ ops / 
second as roughly MB / seconds. A single _cassandra-stress_ thread was used.

The tests were run on a GCE *n1-standard-8* VM: 4 Intel Xeon @ 2.60GHz physical 
cores with 2 hyper-threads per core - all cores located on a single socket, 29 
GB of Memory and a 1073GB SCSI hard disk. Both _cassandra-stress_ and the 
Cassandra process were running on the same VM, as I understand this is normally 
how Spark or other analytics tools are deployed.

h5. Benchmark results

The following is a summary of the results in ops / second, 
[^bulk-read-benchmark.1.html] attached also contains diagrams, 
partitions/second and rows/second.

||100-bytes||1-kbyte||10-kbytes||100-kbytes||100-kbytes-100-rows||100-kbytes-1000-rows||1-MB||
|28|59|101|80|50|10|73|

The performance is variable and very much dependent on partition size and 
number of CQL rows in a partition. Java Flight Recorder (JFR) files for all 
cases, albeit measured with a smaller sample, have also been attached, see 
[^bulk-read-jfr-profiles.1.tar.gz] and [^bulk-read-jfr-profiles.2.tar.gz].

h5. Analysis of the JFR profiles

The following broad categories appear as hot-spots in all JFR profiles but with 
various impacts depending on partition size or number of CQL columns per 
partition:

* +Allocations outside of TLABs (Thread Local Allocation Buffers)+: for 
partition sizes bigger than 1 kbyte we have significant memory allocated 
outside of TLABs caused by heap byte buffers created by {{DataOutputBuffer}} 
when serializing partitions. Because of the heap buffers, +partition 
serialization+ becomes a hot-spot in most of the profiles. This is done when 
creating the local response to the storage proxy thread. 

* +Partition deserialization+: this is expected but the problem is that it is 
done twice, when reading from sstables and when reading the local response in 
the storage proxy thread. 

* +CQL encoding+: this become noticeable for small partitions (100 bytes) or 
large partitions divided into many CQL rows. There are two hot-spots, 
{{ResultSet.Codec.encode()}} and {{ResultSet.Codec.encodedSize()}}.

* +Reading+: various reading methods appear as hot-spots, especially 
{{ByteBufferUtil.readUnsignedVInt}}.

* +Uncompression and rebuffering+: it is only noticeable for partitions of 1MB. 
This is probably because the default compression chunk size and max buffer pool 
size is 64kbytes.

The following are the top code paths for all profiles:

*100-bytes*

# {{ResultSet.Codec.encode}}, 2.78%, CQL encoding.
# {{BasePartitions.hasNext}}, 2.39%, partition deserialization.
# {{ResultSet.Codec.encodedSize}}, 2.08%, CQL encoding.

*1-kbyte*

# {{ByteBufferUtil.read}}, 2.83%, partition deserialization.
# {{ByteBufferUtil.readUnsignedVInt}}, 2.40%, reading.
# {{HeapByteBuffer}} allocations, 2.37%, allocations outside of TLABs.
# {{ResultSet.Codec.encodedSize}}, 1.94%, CQL encoding.

*10-kbytes*

# {{ByteBufferUtil.read}}, 16.69%, partition deserialization.
# {{HeapByteBuffer}} allocations, 11.27%, allocations outside of TLABs.

*100-kbytes*

# {{ByteBufferUtil.read}}, 32.66%, partition deserialization.
# {{HeapByteBuffer}} allocations, 21.17%, allocations outside of TLABs.


*100-kbytes-100-rows*

# {{ByteBufferUtil.read}}, 4.50%, partition deserialization.
# {{HeapByteBuffer}} allocations, 3.40%, allocations outside of TLABs.
# {{ResultSet.Codec.encodedSize}}, 2.99%, CQL encoding.
# {{RebufferingInputStream.readUnsignedVInt}}, 2.26%, reading.

*100-kbytes-1000-rows*

# {{ResultSet.Codec.encode}}, 5.03%%, CQL encoding.
# {{ResultSet.Codec.encodedSize}}, 3.84%%, CQL encoding.
# {{Cell.Serializer.deserialize}}, 2.06%, partition deserialization.

*1-mbyte*

# {{ByteBufferUtil.read}}, 21.26%, partition deserialization.
# {{LZ4Compressor.uncompress()}}, 8.64%, called by 
{{BigTableScanner.seekToCurrentRangeStart()}}, uncompression and rebuffering.
# {{HeapByteBuffer}} allocations, 4.86%, allocations outside of TLABs.


> Bulk Reading from Cassandra
> ---------------------------
>
>                 Key: CASSANDRA-9259
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-9259
>             Project: Cassandra
>          Issue Type: New Feature
>          Components: Compaction, CQL, Local Write-Read Paths, Streaming and 
> Messaging, Testing
>            Reporter:  Brian Hess
>            Assignee: Stefania
>            Priority: Critical
>             Fix For: 3.x
>
>         Attachments: bulk-read-benchmark.1.html, 
> bulk-read-jfr-profiles.1.tar.gz, bulk-read-jfr-profiles.2.tar.gz
>
>
> This ticket is following on from the 2015 NGCC.  This ticket is designed to 
> be a place for discussing and designing an approach to bulk reading.
> The goal is to have a bulk reading path for Cassandra.  That is, a path 
> optimized to grab a large portion of the data for a table (potentially all of 
> it).  This is a core element in the Spark integration with Cassandra, and the 
> speed at which Cassandra can deliver bulk data to Spark is limiting the 
> performance of Spark-plus-Cassandra operations.  This is especially of 
> importance as Cassandra will (likely) leverage Spark for internal operations 
> (for example CASSANDRA-8234).
> The core CQL to consider is the following:
> SELECT a, b, c FROM myKs.myTable WHERE Token(partitionKey) > X AND 
> Token(partitionKey) <= Y
> Here, we choose X and Y to be contained within one token range (perhaps 
> considering the primary range of a node without vnodes, for example).  This 
> query pushes 50K-100K rows/sec, which is not very fast if we are doing bulk 
> operations via Spark (or other processing frameworks - ETL, etc).  There are 
> a few causes (e.g., inefficient paging).
> There are a few approaches that could be considered.  First, we consider a 
> new "Streaming Compaction" approach.  The key observation here is that a bulk 
> read from Cassandra is a lot like a major compaction, though instead of 
> outputting a new SSTable we would output CQL rows to a stream/socket/etc.  
> This would be similar to a CompactionTask, but would strip out some 
> unnecessary things in there (e.g., some of the indexing, etc). Predicates and 
> projections could also be encapsulated in this new "StreamingCompactionTask", 
> for example.
> Another approach would be an alternate storage format.  For example, we might 
> employ Parquet (just as an example) to store the same data as in the primary 
> Cassandra storage (aka SSTables).  This is akin to Global Indexes (an 
> alternate storage of the same data optimized for a particular query).  Then, 
> Cassandra can choose to leverage this alternate storage for particular CQL 
> queries (e.g., range scans).
> These are just 2 suggestions to get the conversation going.
> One thing to note is that it will be useful to have this storage segregated 
> by token range so that when you extract via these mechanisms you do not get 
> replications-factor numbers of copies of the data.  That will certainly be an 
> issue for some Spark operations (e.g., counting).  Thus, we will want 
> per-token-range storage (even for single disks), so this will likely leverage 
> CASSANDRA-6696 (though, we'll want to also consider the single disk case).
> It is also worth discussing what the success criteria is here.  It is 
> unlikely to be as fast as EDW or HDFS performance (though, that is still a 
> good goal), but being within some percentage of that performance should be 
> set as success.  For example, 2x as long as doing bulk operations on HDFS 
> with similar node count/size/etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to