[ 
https://issues.apache.org/jira/browse/CASSANDRA-1278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthew F. Dennis updated CASSANDRA-1278:
-----------------------------------------

    Attachment: 1278-cassandra-0.7.txt

Attached patch implements a Cassandra Pipelined Thrift (CPT) loader.

Thrift objects are serialized over a socket to C* in "segments".  Once the 
segment is loaded, C* responds with a CPTResponse object detailing the various 
counts about what was loaded and the connection is closed. Rows and/or columns 
need not be sorted.  No throttling need occur - the client can safely write 
data to the socket as fast as it will accept it.  CPT correctly handles 
secondary indexes.

The format of sending a segment is:

PROTOCOL_MAGIC (i.e. MessagingService.PROTOCOL_MAGIC)
PROTOCOL_HEADER (i.e. IncomingCassandraPipelinedThriftReader.PROTOCOL_HEADER)
CPTHeader
CPTRowHeader (for each row sent)
Column|SuperColumn (for each Column|SuperColumn in the row)

Each row is terminated by an empty Column or SuperColumn (e.g. 
IncomingCassandraPipelinedThriftReader.END_OF_COLUMNS) Each segment is 
terminated by an empty CPTRowHeader (e.g. 
IncomingCassandraPipelinedThriftReader.END_OF_ROWS)

A CPTHeader consists of several fields:

{code}
struct CPTHeader {
   1: required string keyspace,
   2: required string column_family,
   3: required i32 table_flush_size,
   4: required i32 forward_frame_size,
   5: required i32 so_rcvbuf_size,
   6: required i32 so_sndbuf_size,
   7: required bool forward,
}
{code}

table_flush_size controls the amount of data that is buffered before applying 
the mutation.  Several KB is a good starting value for this.

forward_frame_size only applies with forward=true and controls what thrift 
frame size to use when forwarding data to other nodes.  256K is a good value 
here, but it should be 1/2 or less the size of so_rcvbuf_size and 
so_sndbuf_size.

so_rcvbuf_size/so_sndbuf_size is the size of the socket buffer used by C* when 
accepting/forwarding CPT data respectively.  It should be at least twice as big 
as forward_frame_size and/or the frame sized used when sending CPT data.  
Values > 128K usually require changing net.core.rmem_max/net.core.wmem_max.

forward controls whether C* should forward data to other nodes.

bin/generatecptdata will produce test data with the parameters given in 
bin/generatecptdata.config

bin/listcptdetails will list information about file(s)/dir(s) given as 
arguments, including number of rows and size of the useful raw data (versus 
overhead/packaging).

bin/loadcptdata will load CPT files to a cluster and serves as a great starting 
point for doing this from other applications.

Thrift generates a *lot* of garbage and as such it is trivial to max out JVM GC 
with the current implementation.  Tuning GC for your loads is a good idea for 
best performance (smallish memtables and larger newgen is a good place to 
start).

After a CPT load it is important to flush as CPT loading skips the commitlog 
(which also implies that the "retry" division is on segments, not row and/or 
individual mutations).

Best performance, especially on larger clusters, will be achieved by 
partitioning your data into segments such that a given segment corresponds to 
precisely one range in the cluster (see loadcptdata for an example of how to do 
this).  Then sending each segment to each of the replicas for that range in a 
separate connection/thread with forward=false (note that loadcptdata does *not* 
do this, it assumes segments can contain data for any node).  This provides two 
important benefits:

1) it allows all nodes at all times to make as much progress as possible 
independent of other nodes/replicas slowing down (e.g. compaction, GC).

2) it allows you to retry precisely the failed segments on precisely the failed 
nodes if a node fails during a CPT load (in the more general sense it lets you 
reason about what data was loaded where instead of depending on AE, RR, HH).


> Make bulk loading into Cassandra less crappy, more pluggable
> ------------------------------------------------------------
>
>                 Key: CASSANDRA-1278
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-1278
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Tools
>            Reporter: Jeremy Hanna
>            Assignee: Matthew F. Dennis
>             Fix For: 0.7.1
>
>         Attachments: 1278-cassandra-0.7.txt
>
>   Original Estimate: 40h
>  Remaining Estimate: 40h
>
> Currently bulk loading into Cassandra is a black art.  People are either 
> directed to just do it responsibly with thrift or a higher level client, or 
> they have to explore the contrib/bmt example - 
> http://wiki.apache.org/cassandra/BinaryMemtable  That contrib module requires 
> delving into the code to find out how it works and then applying it to the 
> given problem.  Using either method, the user also needs to keep in mind that 
> overloading the cluster is possible - which will hopefully be addressed in 
> CASSANDRA-685
> This improvement would be to create a contrib module or set of documents 
> dealing with bulk loading.  Perhaps it could include code in the Core to make 
> it more pluggable for external clients of different types.
> It is just that this is something that many that are new to Cassandra need to 
> do - bulk load their data into Cassandra.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to