[ 
https://issues.apache.org/jira/browse/CASSANDRA-4718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13997731#comment-13997731
 ] 

Benedict edited comment on CASSANDRA-4718 at 5/14/14 4:36 PM:
--------------------------------------------------------------

A brief outline of the approach taken by the executor service I've submitted:

It's premised on the idea that unpark() is a relatively expensive operation, 
and can block progress on the thread calling it (often it results in transfer 
of the physical execution to the signalled thread). So we want to avoid 
performing the operation as much as possible, so long as we do not incur any 
other penalties as a result of doing so.

The approach I've taken to avoiding calling unpark() essentially amounts to 
trying to ensure the correct number of threads are running for servicing the 
current workload, without either delay of service or any waiting on any of the 
workers. We achieve this by essentially letting workers schedule themselves, 
except when, on producing work for the queue, we cannot guarantee that they 
will do so promptly (in which rare instance we spin up a worker directly) or 
the queue is full, in which case it costs us little to contribute to firing up 
workers. This can be roughly described as:

# If all workers are currently either sleeping _indefinitely_ or occupied with 
work, we wake one (or start a new) worker
# Before starting any given task, a worker checks if any more work is available 
on the queue it's processing and tries to hand it off to another unoccupied 
worker (preferring those that are scheduled to wake up of their own accord in 
the near future, to avoid signalling it, but waking/starting one if necessary)
# Once we finish a task, we either:
#* take another task from the queue we just processed, if any available, and 
loop back to (2); 
#* reassign ourselves to another executor that has work and go to (2); 
#* finally, if that fails, we enter a "yield"-spin loop
# Each loop we spin for, we sleep a random interval scaled by the number of 
threads in this loop, so that the rate of wakeup on average is constant 
regardless of the number of spinning threads. When we wake up we:
#* Check if we should deschedule ourselves (based on the total time spent 
sleeping by all threads recently - if it exceeds the real time elapsed, we put 
a worker to sleep indefinitely, preferably ourselves)
#* Try to assign ourselves an executor with work outstanding, and go to (2)

The actual assignment and queueing of work is itself a little interesting as 
well: to minimise signalling we have a ConcurrentLinkedQueue which is, by 
definition, unbounded. We then have a separate synchronisation state which 
maintains an atomic count of work permits (threads working the pool) and task 
permits (items on the queue). When we start a worker as a _producer_ we 
actually don't touch this queue at all, we just start a worker in a spinning 
state and let it assign itself some work. We do this to avoid signalling any 
other producers that may be blocked on the queue being full. When as a worker 
we take work from the queue to either assign to ourselves _or another worker_ 
we always atomically take both a worker permit and a task permit (or only the 
latter if we already own a task permit). This allows us to ensure we only wake 
up threads when they definitely have work to do.




was (Author: benedict):
A brief outline of the approach taken by the executor service I've submitted:

It's premised on the idea that unpark() is a relatively expensive operation, 
and can block progress on the thread calling it (often it results in transfer 
of the physical execution to the signalled thread). So we want to avoid 
performing the operation as much as possible, so long as we do not incur any 
other penalties as a result of doing so.

The approach I've taken to avoiding calling unpark() essentially amounts to 
trying to ensure the correct number of threads are running for servicing the 
current workload, without either delay of service or any waiting on any of the 
workers. We achieve this by essentially letting workers schedule themselves, 
except when we cannot guarantee they will do so on producing work for the queue 
(in which rare instance we spin up a worker directly) or the queue is full, in 
which case it costs us little to contribute to firing up workers. This can be 
roughly described as:

# If all workers are currently either sleeping _indefinitely_ or occupied with 
work, we wake one (or start a new) worker
# Before starting any given task, a worker checks if any more work is available 
on the queue it's processing and tries to hand it off to another unoccupied 
worker (preferring those that are scheduled to wake up of their own accord in 
the near future, to avoid signalling it, but waking/starting one if necessary)
# Once we finish a task, we either:
#* take another task from the queue we just processed, if any available, and 
loop back to (2); 
#* reassign ourselves to another executor that has work and go to (2); 
#* finally, if that fails, we enter a "yield"-spin loop
# Each loop we spin for, we sleep a random interval scaled by the number of 
threads in this loop, so that the rate of wakeup on average is constant 
regardless of the number of spinning threads. When we wake up we:
#* Check if we should deschedule ourselves (based on the total time spent 
sleeping by all threads recently - if it exceeds the real time elapsed, we put 
a worker to sleep indefinitely, preferably ourselves)
#* Try to assign ourselves an executor with work outstanding, and go to (2)

The actual assignment and queueing of work is itself a little interesting as 
well: to minimise signalling we have a ConcurrentLinkedQueue which is, by 
definition, unbounded. We then have a separate synchronisation state which 
maintains an atomic count of work permits (threads working the pool) and task 
permits (items on the queue). When we start a worker as a _producer_ we 
actually don't touch this queue at all, we just start a worker in a spinning 
state and let it assign itself some work. We do this to avoid signalling any 
other producers that may be blocked on the queue being full. When as a worker 
we take work from the queue to either assign to ourselves _or another worker_ 
we always atomically take both a worker permit and a task permit (or only the 
latter if we already own a task permit). This allows us to ensure we only wake 
up threads when they definitely have work to do.



> More-efficient ExecutorService for improved throughput
> ------------------------------------------------------
>
>                 Key: CASSANDRA-4718
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-4718
>             Project: Cassandra
>          Issue Type: Improvement
>            Reporter: Jonathan Ellis
>            Assignee: Benedict
>            Priority: Minor
>              Labels: performance
>             Fix For: 2.1.0
>
>         Attachments: 4718-v1.patch, PerThreadQueue.java, aws.svg, 
> aws_read.svg, backpressure-stress.out.txt, baq vs trunk.png, 
> belliotsmith_branches-stress.out.txt, jason_read.svg, jason_read_latency.svg, 
> jason_write.svg, op costs of various queues.ods, stress op rate with various 
> queues.ods, v1-stress.out
>
>
> Currently all our execution stages dequeue tasks one at a time.  This can 
> result in contention between producers and consumers (although we do our best 
> to minimize this by using LinkedBlockingQueue).
> One approach to mitigating this would be to make consumer threads do more 
> work in "bulk" instead of just one task per dequeue.  (Producer threads tend 
> to be single-task oriented by nature, so I don't see an equivalent 
> opportunity there.)
> BlockingQueue has a drainTo(collection, int) method that would be perfect for 
> this.  However, no ExecutorService in the jdk supports using drainTo, nor 
> could I google one.
> What I would like to do here is create just such a beast and wire it into (at 
> least) the write and read stages.  (Other possible candidates for such an 
> optimization, such as the CommitLog and OutboundTCPConnection, are not 
> ExecutorService-based and will need to be one-offs.)
> AbstractExecutorService may be useful.  The implementations of 
> ICommitLogExecutorService may also be useful. (Despite the name these are not 
> actual ExecutorServices, although they share the most important properties of 
> one.)



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to