[ 
https://issues.apache.org/jira/browse/CASSANDRA-7937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14242880#comment-14242880
 ] 

Michaël Figuière edited comment on CASSANDRA-7937 at 12/11/14 6:37 PM:
-----------------------------------------------------------------------

The StreamIDs, introduced in the native protocol to multiplex several pending 
requests on a single connection, could actually serve as a backpressure 
mechanism. Before protocol v2 we had just 128 IDs per connection with drivers 
typically allowing just a few connection per node. This therefore already acts 
as a throttling mechanism on the client side. With protocol v3 we've increased 
this limit but the driver still let the user define a value for the max 
requests per host that will have the same effect. A simple way the handle 
backpressure could therefore be to introduce a Window (similar as TCP Window) 
of the currently allowed concurrent requests for each client. Just like in TCP, 
the Window Size could be included in each response header to the client. This 
Window Size could then be adjusted using a magic formula to define, probably 
based on the load of each Stage of the Cassandra architecture, state of 
compaction, etc...

I agree with [~jbellis]'s point: backpressure in a distributed system like 
Cassandra, with a coordinator fowarding traffic to replicas, is confusing. But 
in practice, most recent CQL Drivers now do Token Aware Balancing by default 
(since 2.0.2 in the Java Driver), which will send the request to the replicas 
for any PreparedStatement (expected to be used under the high pressure 
condition described here). So in this situation the backpressure information 
received by the client could be used properly, as it would just be understood 
by the client as a request to slow down for *this* particular replica, it could 
therefore pick another replica. Thus we end up with a system in which we avoid 
doing Load Shedding (which is a waste of time, bandwidth and workload) and 
that, I believe, could behave more smoothly when the cluster is overloaded.

Note that this StreamID Window could be considered as a "mandatory" limit or 
just as a "hint" in the protocol specification. The driver could then adjust 
its strategy to use it or not depending on the settings or type of request.


was (Author: mfiguiere):
The StreamIDs, introduced in the native protocol to multiplex several pending 
requests on a single connection, could actually serve as a backpressure 
mechanism. Before protocol v2 we had just 128 IDs per connection with drivers 
typically allowing just a few connection per node. This therefore already acts 
as a throttling mechanism on the client side. With protocol v3 we've increased 
this limit but the driver still let the user define a value for the max 
requests per host that will have the same effect. A simple way the handle 
backpressure could therefore be to introduce a Window (similar as TCP Window) 
of the currently allowed concurrent requests for each client. Just like in TCP, 
the Window Size could be included in each response header to the client. This 
Window Size could then be adjusted using a magic formula to define, probably 
based on the load of each Stage of the Cassandra architecture, state of 
compaction, etc...

I agree with [~jbellis]'s point: backpressure in a distributed system like 
Cassandra, with a coordinator fowarding traffic to replicas, is confusing. But 
in practice, most recent CQL Drivers now do Token Aware Balancing by default 
(since 2.0.2 in the Java Driver), which will send the queries to the replicas 
any PreparedStatement (expected to be used under the high pressure condition 
described here). So in this situation the backpressure information received by 
the client could be used properly, as it would just be understood by the client 
as a request to slow down for *this* particular replica, it could therefore 
pick another replica. Thus we end up with a system in which we avoid doing Load 
Shedding (which is a waste of time, bandwidth and workload) and that, I 
believe, could behave more smoothly when the cluster is overloaded.

Note that this StreamID Window could be considered as a "mandatory" limit or 
just as a "hint" in the protocol specification. The driver could then adjust 
its strategy to use it or not depending on the settings or type of request.

> Apply backpressure gently when overloaded with writes
> -----------------------------------------------------
>
>                 Key: CASSANDRA-7937
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-7937
>             Project: Cassandra
>          Issue Type: Bug
>          Components: Core
>         Environment: Cassandra 2.0
>            Reporter: Piotr Kołaczkowski
>              Labels: performance
>
> When writing huge amounts of data into C* cluster from analytic tools like 
> Hadoop or Apache Spark, we can see that often C* can't keep up with the load. 
> This is because analytic tools typically write data "as fast as they can" in 
> parallel, from many nodes and they are not artificially rate-limited, so C* 
> is the bottleneck here. Also, increasing the number of nodes doesn't really 
> help, because in a collocated setup this also increases number of 
> Hadoop/Spark nodes (writers) and although possible write performance is 
> higher, the problem still remains.
> We observe the following behavior:
> 1. data is ingested at an extreme fast pace into memtables and flush queue 
> fills up
> 2. the available memory limit for memtables is reached and writes are no 
> longer accepted
> 3. the application gets hit by "write timeout", and retries repeatedly, in 
> vain 
> 4. after several failed attempts to write, the job gets aborted 
> Desired behaviour:
> 1. data is ingested at an extreme fast pace into memtables and flush queue 
> fills up
> 2. after exceeding some memtable "fill threshold", C* applies adaptive rate 
> limiting to writes - the more the buffers are filled-up, the less writes/s 
> are accepted, however writes still occur within the write timeout.
> 3. thanks to slowed down data ingestion, now flush can finish before all the 
> memory gets used
> Of course the details how rate limiting could be done are up for a discussion.
> It may be also worth considering putting such logic into the driver, not C* 
> core, but then C* needs to expose at least the following information to the 
> driver, so we could calculate the desired maximum data rate:
> 1. current amount of memory available for writes before they would completely 
> block
> 2. total amount of data queued to be flushed and flush progress (amount of 
> data to flush remaining for the memtable currently being flushed)
> 3. average flush write speed



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to