[
https://issues.apache.org/jira/browse/CASSANDRA-10528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14959135#comment-14959135
]
Jonathan Ellis commented on CASSANDRA-10528:
--------------------------------------------
What about MessagingService?
> Proposal: Integrate RxJava
> --------------------------
>
> Key: CASSANDRA-10528
> URL: https://issues.apache.org/jira/browse/CASSANDRA-10528
> Project: Cassandra
> Issue Type: Improvement
> Reporter: T Jake Luciani
> Fix For: 3.x
>
> Attachments: rxjava-stress.png
>
>
> The purpose of this ticket is to discuss the merits of integrating the
> [RxJava|https://github.com/ReactiveX/RxJava] framework into C*. Enabling us
> to incrementally make the internals of C* async and move away from SEDA to a
> more modern thread per core architecture.
> Related tickets:
> * CASSANDRA-8520
> * CASSANDRA-8457
> * CASSANDRA-5239
> * CASSANDRA-7040
> * CASSANDRA-5863
> * CASSANDRA-6696
> * CASSANDRA-7392
> My *primary* goals in raising this issue are to provide a way of:
> * *Incrementally* making the backend async
> * Avoiding code complexity/readability issues
> * Avoiding NIH where possible
> * Building on an extendable library
> My *non*-goals in raising this issue are:
>
> * Rewrite the entire database in one big bang
> * Write our own async api/framework
>
> -------------------------------------------------------------------------------------
> I've attempted to integrate RxJava a while back and found it not ready mainly
> due to our lack of lambda support. Now with Java 8 I've found it very
> enjoyable and have not hit any performance issues. A gentle introduction to
> RxJava is [here|http://blog.danlew.net/2014/09/15/grokking-rxjava-part-1/] as
> well as their
> [wiki|https://github.com/ReactiveX/RxJava/wiki/Additional-Reading]. The
> primary concept of RX is the
> [Obervable|http://reactivex.io/documentation/observable.html] which is
> essentially a stream of stuff you can subscribe to and act on, chain, etc.
> This is quite similar to [Java 8 streams
> api|http://www.oracle.com/technetwork/articles/java/ma14-java-se-8-streams-2177646.html]
> (or I should say streams api is similar to it). The difference is java 8
> streams can't be used for asynchronous events while RxJava can.
> Another improvement since I last tried integrating RxJava is the completion
> of CASSANDRA-8099 which provides is a very iterable/incremental approach to
> our storage engine. *Iterators and Observables are well paired conceptually
> so morphing our current Storage engine to be async is much simpler now.*
> In an effort to show how one can incrementally change our backend I've done a
> quick POC with RxJava and replaced our non-paging read requests to become
> non-blocking.
> https://github.com/apache/cassandra/compare/trunk...tjake:rxjava-3.0
> As you can probably see the code is straight-forward and sometimes quite nice!
> *Old*
> {code}
> private static PartitionIterator
> fetchRows(List<SinglePartitionReadCommand<?>> commands, ConsistencyLevel
> consistencyLevel)
> throws UnavailableException, ReadFailureException, ReadTimeoutException
> {
> int cmdCount = commands.size();
> SinglePartitionReadLifecycle[] reads = new
> SinglePartitionReadLifecycle[cmdCount];
> for (int i = 0; i < cmdCount; i++)
> reads[i] = new SinglePartitionReadLifecycle(commands.get(i),
> consistencyLevel);
> for (int i = 0; i < cmdCount; i++)
> reads[i].doInitialQueries();
> for (int i = 0; i < cmdCount; i++)
> reads[i].maybeTryAdditionalReplicas();
> for (int i = 0; i < cmdCount; i++)
> reads[i].awaitRes
> ultsAndRetryOnDigestMismatch();
> for (int i = 0; i < cmdCount; i++)
> if (!reads[i].isDone())
> reads[i].maybeAwaitFullDataRead();
> List<PartitionIterator> results = new ArrayList<>(cmdCount);
> for (int i = 0; i < cmdCount; i++)
> {
> assert reads[i].isDone();
> results.add(reads[i].getResult());
> }
> return PartitionIterators.concat(results);
> }
> {code}
> *New*
> {code}
> private static Observable<PartitionIterator>
> fetchRows(List<SinglePartitionReadCommand<?>> commands, ConsistencyLevel
> consistencyLevel)
> throws UnavailableException, ReadFailureException, ReadTimeoutException
> {
> return Observable.from(commands)
> .map(command -> new
> SinglePartitionReadLifecycle(command, consistencyLevel))
> .flatMap(read -> read.getPartitionIterator())
> .toList()
> .map(results -> PartitionIterators.concat(results));
> }
> {code}
> Since the read call is now non blocking (no more future.get()) we can remove
> one thread pool hop from the native netty request pool which yields a
> non-trivial improvement to read performance.
> !rxjava-stress.png|width=800px!
> http://cstar.datastax.com/tests/id/ae648c12-729a-11e5-8625-0256e416528f
> At the same time the current Iterator based api still works by calling
> {{.toBlocking()}} on the observable. So for example the existing thrift read
> call requires little modification
> On the async side we get the added benefits of RxJava:
> * Customizable backpressure strategies (for dealing with streams that can't
> be processed quickly enough)
> * Cancelling of work due to timeouts is a 1 line change
> * When a Subscriber disconnects from the stream they Observable stops as
> well
> * Batching/windowing of work can be added in one line
> * Observers and Subscribers can do work across any thread at any stage of
> the pipeline
> * Observables can be [debugged|https://github.com/ReactiveX/RxJavaDebug]
> and
> [tested|http://reactivex.io/RxJava/javadoc/rx/observers/TestSubscriber.html]
> Another plus is the community surrounding RxJava specifically our good
> friends at netflix have authored and used it extensively. Docs and examples
> are good.
> In order to get the most out of this we will need to take this api further
> into the code. MessagingService, Disk Access/Page, Cache, Thread per core...
> but again I want to hammer home this will be able to be achieved
> incrementally.
> On the bad side this is:
> * Locking into a "framework"
> * Will inevitably hit bugs / performance issues we need fixed upstream
> * Some of the more advanced API uses look pretty mentally taxing/hard to
> grasp
> Which brings us to the Alternatives, primarily being to just use
> CompletableFutures.
> We certainly could but if you look at the code changes I had to make to make
> the SP calls asynchronous I think you will realize you would need to pass
> all kinds of state around to get the read command callback to start the netty
> write. Vs observables which make that pipeline declarative. Also more
> advanced things like backpressure and message passing between N:M producers
> and consumers becomes complex. This isn't to say we can't [use
> both|http://www.nurkiewicz.com/2014/11/converting-between-completablefuture.html]
> if Observables are overkill.
> I hope this ticket sparks some good discussion!
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)