[
https://issues.apache.org/jira/browse/CASSANDRA-4718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13635532#comment-13635532
]
Piotr Kołaczkowski commented on CASSANDRA-4718:
-----------------------------------------------
I made another version of benchmark, according to Sergio's suggestions. Now it
uses the following message processing graph:
{noformat}
/------ stage 0 processor 0 ----\ /---- ----\
/--- ---\
+------ stage 0 processor 1 ----+ +---- ----+
+--- ---+
>--+------ stage 0 processor 2 ----+---->----+---- STAGE 1 ----+------>- ...
>--->-+--- STAGE m ---+----->
+------ ... ----+ +---- ----+
+--- ---+
\------ stage 0 processor n ----/ \---- ----/
\--- ---/
{noformat}
128 threads are concurrently trying to get messages through all the stages and
measure average latency, including the time required for the message to enter
stage 0.
Thread-pool stages are built from fixed size thread pools with n=8, because
there are 8 cores.
Actor-based stages are build from 128 actors each with a RoundRobinRouter in
front of every stage.
Average latencies:
{noformat}
{noformat}3 stages:
Sync: 364687 ns
Async: 210766 ns
Akka: 201842 ns
4 stages:
Sync: 492581 ns
Async: 221118 ns
Akka: 239407 ns
5 stages:
Sync: 671733 ns
Async: 245370 ns
Akka: 283798 ns
6 stages:
Sync: 781759 ns
Async: 262742 ns
Akka: 309384 ns
{noformat}
So Akka comes slightly slower than async thread pools.
If someone wants to play with my code, here is the up-to-date version:
{noformat}
import java.util.concurrent.{CountDownLatch, Executors}
import akka.actor.{Props, ActorSystem, Actor, ActorRef}
import akka.routing.{SmallestMailboxRouter, RoundRobinRouter}
class Message {
var counter = 0
val latch = new CountDownLatch(1)
}
abstract class MultistageThreadPoolProcessor(stageCount: Int) {
val stages =
for (i <- 1 to stageCount) yield Executors.newFixedThreadPool(8)
def shutdown() {
stages.foreach(_.shutdown())
}
}
/** Synchronously processes a message through the stages.
* The message is passed stage-to-stage by the coordinator thread. */
class SyncThreadPoolProcessor(stageCount: Int) extends
MultistageThreadPoolProcessor(stageCount) {
def process() {
val message = new Message
val task = new Runnable() {
def run() { message.counter += 1 }
}
for (executor <- stages)
executor.submit(task).get()
}
}
/** Asynchronously processes a message through the stages.
* Every stage after finishing its processing of the message
* passes the message directly to the next stage, without bothering the
coordinator thread. */
class AsyncThreadPoolProcessor(stageCount: Int) extends
MultistageThreadPoolProcessor(stageCount) {
def process() {
val message = new Message
val task = new Runnable() {
def run() {
message.counter += 1
if (message.counter >= stages.size)
message.latch.countDown()
else
stages(message.counter).submit(this)
}
}
stages(0).submit(task)
message.latch.await()
}
}
/** Similar to AsyncThreadPoolProcessor but it uses Akka actor system instead
of thread pools and queues.
* Every stage after finishing its processing of the message
* passes the message directly to the next stage, without bothering the
coordinator thread. */
class AkkaProcessor(stageCount: Int) {
val system = ActorSystem()
val stages: IndexedSeq[ActorRef] = {
for (i <- 1 to stageCount) yield
system.actorOf(Props(createActor()).withRouter(RoundRobinRouter(nrOfInstances =
128)))
}
def createActor(): Actor = {
new Actor {
def receive = {
case m: Message =>
m.counter += 1
if (m.counter >= stages.size)
m.latch.countDown()
else
stages(m.counter) ! m
}
}
}
def process() {
val message = new Message
stages(0) ! message
message.latch.await()
}
def shutdown() {
system.shutdown()
}
}
object MessagingBenchmark extends App {
def measureLatency(count: Int, f: () => Any): Double = {
val start = System.nanoTime()
for (i <- 1 to count)
f()
val end = System.nanoTime()
(end - start).toDouble / count
}
def measureLatency(threadCount: Int, messageCount: Int, f: () => Any): Double
= {
class RequestThread extends Thread {
var latency: Double = 0.0
override def run() { latency = measureLatency(messageCount, f) }
}
val threads =
for (i <- 1 to threadCount) yield new RequestThread()
threads.foreach(_.start())
threads.foreach(_.join())
threads.map(_.latency).sum / threads.size
}
val messageCount = 50000
for (stageCount <- List(3,3,4,5,6,7,8,16,32))
{
printf("\n%d stages: \n", stageCount)
val syncProcessor = new SyncThreadPoolProcessor(stageCount)
val asyncProcessor = new AsyncThreadPoolProcessor(stageCount)
val akkaProcessor = new AkkaProcessor(stageCount)
printf("Sync: %8.0f ns\n", measureLatency(128, messageCount,
syncProcessor.process))
printf("Async: %8.0f ns\n", measureLatency(128, messageCount,
asyncProcessor.process))
printf("Akka: %8.0f ns\n", measureLatency(128, messageCount,
akkaProcessor.process))
syncProcessor.shutdown()
asyncProcessor.shutdown()
akkaProcessor.shutdown()
}
}
{noformat}
> More-efficient ExecutorService for improved throughput
> ------------------------------------------------------
>
> Key: CASSANDRA-4718
> URL: https://issues.apache.org/jira/browse/CASSANDRA-4718
> Project: Cassandra
> Issue Type: Improvement
> Reporter: Jonathan Ellis
> Priority: Minor
> Attachments: baq vs trunk.png, PerThreadQueue.java
>
>
> Currently all our execution stages dequeue tasks one at a time. This can
> result in contention between producers and consumers (although we do our best
> to minimize this by using LinkedBlockingQueue).
> One approach to mitigating this would be to make consumer threads do more
> work in "bulk" instead of just one task per dequeue. (Producer threads tend
> to be single-task oriented by nature, so I don't see an equivalent
> opportunity there.)
> BlockingQueue has a drainTo(collection, int) method that would be perfect for
> this. However, no ExecutorService in the jdk supports using drainTo, nor
> could I google one.
> What I would like to do here is create just such a beast and wire it into (at
> least) the write and read stages. (Other possible candidates for such an
> optimization, such as the CommitLog and OutboundTCPConnection, are not
> ExecutorService-based and will need to be one-offs.)
> AbstractExecutorService may be useful. The implementations of
> ICommitLogExecutorService may also be useful. (Despite the name these are not
> actual ExecutorServices, although they share the most important properties of
> one.)
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira