[
https://issues.apache.org/jira/browse/CASSANDRA-10528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15212391#comment-15212391
]
T Jake Luciani edited comment on CASSANDRA-10528 at 3/25/16 8:58 PM:
---------------------------------------------------------------------
I've done some work trying out some ideas and here are my results.
Still only focused on RF=1 in memory reads. Just want to see how much faster
they can be in existing codebase.
Test Setup:
* 1 c3.8xlarge (16 "cores", 32g ram, tsc clock)
* 6 m3.2xlarge stress nodes
I checked with perf and verified the max TX of the c* nodes was ~952Mb
Next for a baseline I hacked a version of trunk that simply returns a cached
response from netty.
*With this cached netty response version I was able to max out the network of
the C* node at 600k/sec.*
*With trunk version I was able to get 240k/sec with a p99 latency of 37ms and a
p999 of 67ms.*
Next I tried a couple approaches to RxJava schedulers.
* Wrote a busy spin event loop with thread affinity and the ability to route
to a single core per token. This approach required pinning the netty threads
to specific cores and the rest of the cores to the other nodes. I think this
approach worked best with 1/4 or the cores focused on netty work and rest on
other work.
* Used netty event loop as the RxJava scheduler loop with/without affinity.
The loop is setup to process all the events for a given request on the same
event loop.
*The Netty event loop ended up working best with -Dio.netty.eventLoopThread=32
and I was able to see 288k/sec with p99 at 25ms and p999 at 49ms.*
*An increase of ~15% in throughput and tail latency > 25% improved. *
I'm going to try running this on a beefier instance like an i2.8xlarge that has
32 cores and 10g network
was (Author: tjake):
I've done some work trying out some ideas and here are my results.
Still only focused on RF=1 in memory reads. Just want to see how much faster
they can be in existing codebase.
Test Setup:
* 1 c3.8xlarge (16 "cores", 32g ram, tsc clock)
* 6 m3.2xlarge stress nodes
I checked with perf and verified the max TX of the c* nodes was ~952Mb
Next for a baseline I hacked a version of trunk that simply returns a cached
response from netty.
*With this cached netty response version I was able to max out the network of
the C* node at 600k/sec.*
*With trunk version I was able to get 240k/sec with a p99 latency of 37ms and a
p999 of 67ms.*
Next I tried a couple approaches to RxJava schedulers.
* Wrote a busy spin event loop with thread affinity and the ability to route
to a single core per token. This approach required pinning the netty threads
to specific cores and the rest of the cores to the other nodes. I think this
approach worked best with 1/4 or the cores focused on netty work and rest on
other work.
* Used netty event loop as the RxJava scheduler loop with/without affinity.
The loop is setup to process all the events for a given request on the same
event loop.
The Netty event loop ended up working best with -Dio.netty.eventLoopThread=32
and I was able to see
288k/sec with p99 at 25ms and p999 at 49ms.
* An increase of ~15% in throughput and tail latency > 25% improved. *
I'm going to try running this on a beefier instance like an i2.8xlarge that has
32 cores and 10g network
> Proposal: Integrate RxJava
> --------------------------
>
> Key: CASSANDRA-10528
> URL: https://issues.apache.org/jira/browse/CASSANDRA-10528
> Project: Cassandra
> Issue Type: Improvement
> Reporter: T Jake Luciani
> Assignee: T Jake Luciani
> Fix For: 3.x
>
> Attachments: rxjava-stress.png
>
>
> The purpose of this ticket is to discuss the merits of integrating the
> [RxJava|https://github.com/ReactiveX/RxJava] framework into C*. Enabling us
> to incrementally make the internals of C* async and move away from SEDA to a
> more modern thread per core architecture.
> Related tickets:
> * CASSANDRA-8520
> * CASSANDRA-8457
> * CASSANDRA-5239
> * CASSANDRA-7040
> * CASSANDRA-5863
> * CASSANDRA-6696
> * CASSANDRA-7392
> My *primary* goals in raising this issue are to provide a way of:
> * *Incrementally* making the backend async
> * Avoiding code complexity/readability issues
> * Avoiding NIH where possible
> * Building on an extendable library
> My *non*-goals in raising this issue are:
>
> * Rewrite the entire database in one big bang
> * Write our own async api/framework
>
> -------------------------------------------------------------------------------------
> I've attempted to integrate RxJava a while back and found it not ready mainly
> due to our lack of lambda support. Now with Java 8 I've found it very
> enjoyable and have not hit any performance issues. A gentle introduction to
> RxJava is [here|http://blog.danlew.net/2014/09/15/grokking-rxjava-part-1/] as
> well as their
> [wiki|https://github.com/ReactiveX/RxJava/wiki/Additional-Reading]. The
> primary concept of RX is the
> [Obervable|http://reactivex.io/documentation/observable.html] which is
> essentially a stream of stuff you can subscribe to and act on, chain, etc.
> This is quite similar to [Java 8 streams
> api|http://www.oracle.com/technetwork/articles/java/ma14-java-se-8-streams-2177646.html]
> (or I should say streams api is similar to it). The difference is java 8
> streams can't be used for asynchronous events while RxJava can.
> Another improvement since I last tried integrating RxJava is the completion
> of CASSANDRA-8099 which provides is a very iterable/incremental approach to
> our storage engine. *Iterators and Observables are well paired conceptually
> so morphing our current Storage engine to be async is much simpler now.*
> In an effort to show how one can incrementally change our backend I've done a
> quick POC with RxJava and replaced our non-paging read requests to become
> non-blocking.
> https://github.com/apache/cassandra/compare/trunk...tjake:rxjava-3.0
> As you can probably see the code is straight-forward and sometimes quite nice!
> *Old*
> {code}
> private static PartitionIterator
> fetchRows(List<SinglePartitionReadCommand<?>> commands, ConsistencyLevel
> consistencyLevel)
> throws UnavailableException, ReadFailureException, ReadTimeoutException
> {
> int cmdCount = commands.size();
> SinglePartitionReadLifecycle[] reads = new
> SinglePartitionReadLifecycle[cmdCount];
> for (int i = 0; i < cmdCount; i++)
> reads[i] = new SinglePartitionReadLifecycle(commands.get(i),
> consistencyLevel);
> for (int i = 0; i < cmdCount; i++)
> reads[i].doInitialQueries();
> for (int i = 0; i < cmdCount; i++)
> reads[i].maybeTryAdditionalReplicas();
> for (int i = 0; i < cmdCount; i++)
> reads[i].awaitRes
> ultsAndRetryOnDigestMismatch();
> for (int i = 0; i < cmdCount; i++)
> if (!reads[i].isDone())
> reads[i].maybeAwaitFullDataRead();
> List<PartitionIterator> results = new ArrayList<>(cmdCount);
> for (int i = 0; i < cmdCount; i++)
> {
> assert reads[i].isDone();
> results.add(reads[i].getResult());
> }
> return PartitionIterators.concat(results);
> }
> {code}
> *New*
> {code}
> private static Observable<PartitionIterator>
> fetchRows(List<SinglePartitionReadCommand<?>> commands, ConsistencyLevel
> consistencyLevel)
> throws UnavailableException, ReadFailureException, ReadTimeoutException
> {
> return Observable.from(commands)
> .map(command -> new
> SinglePartitionReadLifecycle(command, consistencyLevel))
> .flatMap(read -> read.getPartitionIterator())
> .toList()
> .map(results -> PartitionIterators.concat(results));
> }
> {code}
> Since the read call is now non blocking (no more future.get()) we can remove
> one thread pool hop from the native netty request pool which yields a
> non-trivial improvement to read performance.
> !rxjava-stress.png|width=800px!
> http://cstar.datastax.com/tests/id/ae648c12-729a-11e5-8625-0256e416528f
> At the same time the current Iterator based api still works by calling
> {{.toBlocking()}} on the observable. So for example the existing thrift read
> call requires little modification
> On the async side we get the added benefits of RxJava:
> * Customizable backpressure strategies (for dealing with streams that can't
> be processed quickly enough)
> * Cancelling of work due to timeouts is a 1 line change
> * When a Subscriber disconnects from the stream they Observable stops as
> well
> * Batching/windowing of work can be added in one line
> * Observers and Subscribers can do work across any thread at any stage of
> the pipeline
> * Observables can be [debugged|https://github.com/ReactiveX/RxJavaDebug]
> and
> [tested|http://reactivex.io/RxJava/javadoc/rx/observers/TestSubscriber.html]
> Another plus is the community surrounding RxJava specifically our good
> friends at netflix have authored and used it extensively. Docs and examples
> are good.
> In order to get the most out of this we will need to take this api further
> into the code. MessagingService, Disk Access/Page, Cache, Thread per core...
> but again I want to hammer home this will be able to be achieved
> incrementally.
> On the bad side this is:
> * Locking into a "framework"
> * Will inevitably hit bugs / performance issues we need fixed upstream
> * Some of the more advanced API uses look pretty mentally taxing/hard to
> grasp
> Which brings us to the Alternatives, primarily being to just use
> CompletableFutures.
> We certainly could but if you look at the code changes I had to make to make
> the SP calls asynchronous I think you will realize you would need to pass
> all kinds of state around to get the read command callback to start the netty
> write. Vs observables which make that pipeline declarative. Also more
> advanced things like backpressure and message passing between N:M producers
> and consumers becomes complex. This isn't to say we can't [use
> both|http://www.nurkiewicz.com/2014/11/converting-between-completablefuture.html]
> if Observables are overkill.
> I hope this ticket sparks some good discussion!
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)