[
https://issues.apache.org/jira/browse/CASSANDRA-10528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
T Jake Luciani updated CASSANDRA-10528:
---------------------------------------
Description:
The purpose of this ticket is to discuss the merits of integrating the
[RxJava|https://github.com/ReactiveX/RxJava] framework into C*. Enabling us to
incrementally make the internals of C* async and move away from SEDA to a more
modern thread per core architecture.
Related tickets:
* CASSANDRA-8520
* CASSANDRA-8457
* CASSANDRA-5239
* CASSANDRA-7040
* CASSANDRA-5863
* CASSANDRA-6696
* CASSANDRA-7392
My *primary* goals in raising this issue are to provide a way of:
* *Incrementally* making the backend async
* Avoiding code complexity/readability issues
* Avoiding NIH where possible
* Building on an extendable library
My *non*-goals in raising this issue are:
* Rewrite the entire database in one big bang
* Write our own async api/framework
-------------------------------------------------------------------------------------
I've attempted to integrate RxJava a while back and found it not ready mainly
due to our lack of lambda support. Now with Java 8 I've found it very
enjoyable and have not hit any performance issues. A gentle introduction to
RxJava is [here|http://blog.danlew.net/2014/09/15/grokking-rxjava-part-1/] as
well as their
[wiki|https://github.com/ReactiveX/RxJava/wiki/Additional-Reading]. The
primary concept of RX is the
[Obervable|http://reactivex.io/documentation/observable.html] which is
essentially a stream of stuff you can subscribe to and act on, chain, etc. This
is quite similar to [Java 8 streams
api|http://www.oracle.com/technetwork/articles/java/ma14-java-se-8-streams-2177646.html]
(or I should say streams api is similar to it). The difference is java 8
streams can't be used for asynchronous events while RxJava can.
Another improvement since I last tried integrating RxJava is the completion of
CASSANDRA-8099 which provides is a very iterable/incremental approach to our
storage engine. *Iterators and Observables are well paired conceptually so
morphing our current Storage engine to be async is much simpler now.*
In an effort to show how one can incrementally change our backend I've done a
quick POC with RxJava and replaced our non-paging read requests to become
non-blocking.
https://github.com/apache/cassandra/compare/trunk...tjake:rxjava-3.0
As you can probably see the code is straight-forward and sometimes quite nice!
*Old*
{code}
private static PartitionIterator fetchRows(List<SinglePartitionReadCommand<?>>
commands, ConsistencyLevel consistencyLevel)
throws UnavailableException, ReadFailureException, ReadTimeoutException
{
int cmdCount = commands.size();
SinglePartitionReadLifecycle[] reads = new
SinglePartitionReadLifecycle[cmdCount];
for (int i = 0; i < cmdCount; i++)
reads[i] = new SinglePartitionReadLifecycle(commands.get(i),
consistencyLevel);
for (int i = 0; i < cmdCount; i++)
reads[i].doInitialQueries();
for (int i = 0; i < cmdCount; i++)
reads[i].maybeTryAdditionalReplicas();
for (int i = 0; i < cmdCount; i++)
reads[i].awaitRes
ultsAndRetryOnDigestMismatch();
for (int i = 0; i < cmdCount; i++)
if (!reads[i].isDone())
reads[i].maybeAwaitFullDataRead();
List<PartitionIterator> results = new ArrayList<>(cmdCount);
for (int i = 0; i < cmdCount; i++)
{
assert reads[i].isDone();
results.add(reads[i].getResult());
}
return PartitionIterators.concat(results);
}
{code}
*New*
{code}
private static Observable<PartitionIterator>
fetchRows(List<SinglePartitionReadCommand<?>> commands, ConsistencyLevel
consistencyLevel)
throws UnavailableException, ReadFailureException, ReadTimeoutException
{
return Observable.from(commands)
.map(command -> new
SinglePartitionReadLifecycle(command, consistencyLevel))
.flatMap(read -> read.getPartitionIterator())
.toList()
.map(results -> PartitionIterators.concat(results));
}
{code}
Since the read call is now non blocking (no more future.get()) we can remove
one thread pool hop from the native netty request pool which yields a
non-trivial improvement to read performance.
!rxjava-stress.png|width=800px!
http://cstar.datastax.com/tests/id/ae648c12-729a-11e5-8625-0256e416528f
At the same time the current Iterator based api still works by calling
{{.toBlocking()}} on the observable. So for example the existing thrift read
call requires little modification
On the async side we get the added benefits of RxJava:
* Customizable backpressure strategies (for dealing with streams that can't
be processed quickly enough)
* Cancelling of work due to timeouts is a 1 line change
* When a Subscriber disconnects from the stream they Observable stops as well
* Batching/windowing of work can be added in one line
* Observers and Subscribers can do work across any thread at any stage of the
pipeline
* Observables can be [debugged|https://github.com/ReactiveX/RxJavaDebug] and
[tested|http://reactivex.io/RxJava/javadoc/rx/observers/TestSubscriber.html]
Another plus is the community surrounding RxJava specifically our good friends
at netflix have authored and used it extensively. Docs and examples are good.
In order to get the most out of this we will need to take this api further into
the code. MessagingService, Disk Access/Page, Cache, Thread per core... but
again I want to hammer home this will be able to be achieved incrementally.
On the bad side this is:
* Locking into a "framework"
* Will inevitably hit bugs / performance issues we need fixed upstream
* Some of the more advanced API uses look pretty mentally taxing/hard to grasp
Which brings us to the Alternatives, primarily being to just use
CompletableFutures.
We certainly could but if you look at the code changes I had to make to make
the SP calls asynchronous I think you will realize you would need to pass
all kinds of state around to get the read command callback to start the netty
write. Vs observables which make that pipeline declarative. Also more advanced
things like backpressure and message passing between N:M producers and
consumers becomes complex. This isn't to say we can't [use
both|http://www.nurkiewicz.com/2014/11/converting-between-completablefuture.html]
if Observables are overkill.
I hope this ticket sparks some good discussion!
was:
The purpose of this ticket is to discuss the merits of integrating the
[RxJava|https://github.com/ReactiveX/RxJava] framework into C*. Enabling us to
incrementally make the internals of C* async and move away from SEDA to a more
modern thread per core architecture.
Related tickets:
* CASSANDRA-8520
* CASSANDRA-8457
* CASSANDRA-5239
* CASSANDRA-7040
* CASSANDRA-5863
* CASSANDRA-6696
* CASSANDRA-7392
My *primary* goals in raising this issue are to provide a way of:
* *Incrementally* making the backend async
* Avoiding code complexity/readability issues
* Avoiding NIH where possible
* Building on an extendable library
My *non*-goals in raising this issue are:
* Rewrite the entire database in one big bang
* Write our own async api/framework
-------------------------------------------------------------------------------------
I've attempted to integrate RxJava a while back and found it not ready mainly
due to our lack of lambda support. Now with Java 8 I've found it very
enjoyable and have not hit any performance issues. A gentle introduction to
RxJava is [here|http://blog.danlew.net/2014/09/15/grokking-rxjava-part-1/] as
well as their
[wiki|https://github.com/ReactiveX/RxJava/wiki/Additional-Reading]. The
primary concept of the
[Obervable|http://reactivex.io/documentation/observable.html] which is
essentially a stream of stuff you can subscribe to and act on, chain, etc. This
is quite similar to [Java 8 streams
api|http://www.oracle.com/technetwork/articles/java/ma14-java-se-8-streams-2177646.html]
(or I should say streams api is similar to it). The difference is java 8
streams can't be used for asynchronous events while RxJava can.
Another improvement since I last tried integrating RxJava is the completion on
CASSANDRA-8099 which provides is a very iterable/incremental approach to our
storage engine. *Iterators and Observables are well paired conceptually so
morphing our current Storage engine to be async is much simpler now.*
In an effort to show how one can incrementally change our backend I've done a
quick POC with RxJava and replaced our non-paging read requests to become
non-blocking.
https://github.com/apache/cassandra/compare/trunk...tjake:rxjava-3.0
As you can probably see the code is straight-forward and sometimes quite nice!
*Old*
{code}
private static PartitionIterator fetchRows(List<SinglePartitionReadCommand<?>>
commands, ConsistencyLevel consistencyLevel)
throws UnavailableException, ReadFailureException, ReadTimeoutException
{
int cmdCount = commands.size();
SinglePartitionReadLifecycle[] reads = new
SinglePartitionReadLifecycle[cmdCount];
for (int i = 0; i < cmdCount; i++)
reads[i] = new SinglePartitionReadLifecycle(commands.get(i),
consistencyLevel);
for (int i = 0; i < cmdCount; i++)
reads[i].doInitialQueries();
for (int i = 0; i < cmdCount; i++)
reads[i].maybeTryAdditionalReplicas();
for (int i = 0; i < cmdCount; i++)
reads[i].awaitRes
ultsAndRetryOnDigestMismatch();
for (int i = 0; i < cmdCount; i++)
if (!reads[i].isDone())
reads[i].maybeAwaitFullDataRead();
List<PartitionIterator> results = new ArrayList<>(cmdCount);
for (int i = 0; i < cmdCount; i++)
{
assert reads[i].isDone();
results.add(reads[i].getResult());
}
return PartitionIterators.concat(results);
}
{code}
*New*
{code}
private static Observable<PartitionIterator>
fetchRows(List<SinglePartitionReadCommand<?>> commands, ConsistencyLevel
consistencyLevel)
throws UnavailableException, ReadFailureException, ReadTimeoutException
{
return Observable.from(commands)
.map(command -> new
SinglePartitionReadLifecycle(command, consistencyLevel))
.flatMap(read -> read.getPartitionIterator())
.toList()
.map(results -> PartitionIterators.concat(results));
}
{code}
Since the read call is now non blocking (no more future.get()) we can remove
one thread pool hop from the native netty request pool which yields a
non-trivial improvement to read performance.
!rxjava-stress.png|width=800px!
http://cstar.datastax.com/tests/id/ae648c12-729a-11e5-8625-0256e416528f
At the same time the current Iterator based api still works by calling
{{.toBlocking()}} on the observable. So for example the existing thrift read
call requires little modification
On the async side we get the added benefits of RxJava:
* Customizable backpressure strategies (for dealing with streams that can't
be processed quickly enough)
* Cancelling of work due to timeouts is a 1 line change
* When a Subscriber disconnects from the stream they Observable stops as well
* Batching/windowing of work can be added in one line
* Observers and Subscribers can do work across any thread at any stage of the
pipeline
* Observables can be [debugged|https://github.com/ReactiveX/RxJavaDebug] and
[tested|http://reactivex.io/RxJava/javadoc/rx/observers/TestSubscriber.html]
Another plus is the community surrounding RxJava specifically our good friends
at netflix have authored and used it extensively. Docs and examples are good.
In order to get the most out of this we will need to take this api further into
the code. MessagingService, Disk Access/Page, Cache, Thread per core... but
again I want to hammer home this will be able to be achieved incrementally.
On the bad side this is:
* Locking into a "framework"
* Will inevitably hit bugs / performance issues we need fixed upstream
* Some of the more advanced API uses look pretty mentally taxing/hard to grasp
Which brings us to the Alternatives, primarily being to just use
CompletableFutures.
We certainly could but if you look at the code changes I had to make to make
the SP calls asynchronous I think you will realize you would need to pass
all kinds of state around to get the read command callback to start the netty
write. Vs observables which make that pipeline declarative. Also more advanced
things like backpressure and message passing between N:M producers and
consumers becomes complex. This isn't to say we can't [use
both|http://www.nurkiewicz.com/2014/11/converting-between-completablefuture.html]
if Observables are overkill.
I hope this ticket sparks some good discussion!
> Proposal: Integrate RxJava
> --------------------------
>
> Key: CASSANDRA-10528
> URL: https://issues.apache.org/jira/browse/CASSANDRA-10528
> Project: Cassandra
> Issue Type: Improvement
> Reporter: T Jake Luciani
> Fix For: 3.x
>
> Attachments: rxjava-stress.png
>
>
> The purpose of this ticket is to discuss the merits of integrating the
> [RxJava|https://github.com/ReactiveX/RxJava] framework into C*. Enabling us
> to incrementally make the internals of C* async and move away from SEDA to a
> more modern thread per core architecture.
> Related tickets:
> * CASSANDRA-8520
> * CASSANDRA-8457
> * CASSANDRA-5239
> * CASSANDRA-7040
> * CASSANDRA-5863
> * CASSANDRA-6696
> * CASSANDRA-7392
> My *primary* goals in raising this issue are to provide a way of:
> * *Incrementally* making the backend async
> * Avoiding code complexity/readability issues
> * Avoiding NIH where possible
> * Building on an extendable library
> My *non*-goals in raising this issue are:
>
> * Rewrite the entire database in one big bang
> * Write our own async api/framework
>
> -------------------------------------------------------------------------------------
> I've attempted to integrate RxJava a while back and found it not ready mainly
> due to our lack of lambda support. Now with Java 8 I've found it very
> enjoyable and have not hit any performance issues. A gentle introduction to
> RxJava is [here|http://blog.danlew.net/2014/09/15/grokking-rxjava-part-1/] as
> well as their
> [wiki|https://github.com/ReactiveX/RxJava/wiki/Additional-Reading]. The
> primary concept of RX is the
> [Obervable|http://reactivex.io/documentation/observable.html] which is
> essentially a stream of stuff you can subscribe to and act on, chain, etc.
> This is quite similar to [Java 8 streams
> api|http://www.oracle.com/technetwork/articles/java/ma14-java-se-8-streams-2177646.html]
> (or I should say streams api is similar to it). The difference is java 8
> streams can't be used for asynchronous events while RxJava can.
> Another improvement since I last tried integrating RxJava is the completion
> of CASSANDRA-8099 which provides is a very iterable/incremental approach to
> our storage engine. *Iterators and Observables are well paired conceptually
> so morphing our current Storage engine to be async is much simpler now.*
> In an effort to show how one can incrementally change our backend I've done a
> quick POC with RxJava and replaced our non-paging read requests to become
> non-blocking.
> https://github.com/apache/cassandra/compare/trunk...tjake:rxjava-3.0
> As you can probably see the code is straight-forward and sometimes quite nice!
> *Old*
> {code}
> private static PartitionIterator
> fetchRows(List<SinglePartitionReadCommand<?>> commands, ConsistencyLevel
> consistencyLevel)
> throws UnavailableException, ReadFailureException, ReadTimeoutException
> {
> int cmdCount = commands.size();
> SinglePartitionReadLifecycle[] reads = new
> SinglePartitionReadLifecycle[cmdCount];
> for (int i = 0; i < cmdCount; i++)
> reads[i] = new SinglePartitionReadLifecycle(commands.get(i),
> consistencyLevel);
> for (int i = 0; i < cmdCount; i++)
> reads[i].doInitialQueries();
> for (int i = 0; i < cmdCount; i++)
> reads[i].maybeTryAdditionalReplicas();
> for (int i = 0; i < cmdCount; i++)
> reads[i].awaitRes
> ultsAndRetryOnDigestMismatch();
> for (int i = 0; i < cmdCount; i++)
> if (!reads[i].isDone())
> reads[i].maybeAwaitFullDataRead();
> List<PartitionIterator> results = new ArrayList<>(cmdCount);
> for (int i = 0; i < cmdCount; i++)
> {
> assert reads[i].isDone();
> results.add(reads[i].getResult());
> }
> return PartitionIterators.concat(results);
> }
> {code}
> *New*
> {code}
> private static Observable<PartitionIterator>
> fetchRows(List<SinglePartitionReadCommand<?>> commands, ConsistencyLevel
> consistencyLevel)
> throws UnavailableException, ReadFailureException, ReadTimeoutException
> {
> return Observable.from(commands)
> .map(command -> new
> SinglePartitionReadLifecycle(command, consistencyLevel))
> .flatMap(read -> read.getPartitionIterator())
> .toList()
> .map(results -> PartitionIterators.concat(results));
> }
> {code}
> Since the read call is now non blocking (no more future.get()) we can remove
> one thread pool hop from the native netty request pool which yields a
> non-trivial improvement to read performance.
> !rxjava-stress.png|width=800px!
> http://cstar.datastax.com/tests/id/ae648c12-729a-11e5-8625-0256e416528f
> At the same time the current Iterator based api still works by calling
> {{.toBlocking()}} on the observable. So for example the existing thrift read
> call requires little modification
> On the async side we get the added benefits of RxJava:
> * Customizable backpressure strategies (for dealing with streams that can't
> be processed quickly enough)
> * Cancelling of work due to timeouts is a 1 line change
> * When a Subscriber disconnects from the stream they Observable stops as
> well
> * Batching/windowing of work can be added in one line
> * Observers and Subscribers can do work across any thread at any stage of
> the pipeline
> * Observables can be [debugged|https://github.com/ReactiveX/RxJavaDebug]
> and
> [tested|http://reactivex.io/RxJava/javadoc/rx/observers/TestSubscriber.html]
> Another plus is the community surrounding RxJava specifically our good
> friends at netflix have authored and used it extensively. Docs and examples
> are good.
> In order to get the most out of this we will need to take this api further
> into the code. MessagingService, Disk Access/Page, Cache, Thread per core...
> but again I want to hammer home this will be able to be achieved
> incrementally.
> On the bad side this is:
> * Locking into a "framework"
> * Will inevitably hit bugs / performance issues we need fixed upstream
> * Some of the more advanced API uses look pretty mentally taxing/hard to
> grasp
> Which brings us to the Alternatives, primarily being to just use
> CompletableFutures.
> We certainly could but if you look at the code changes I had to make to make
> the SP calls asynchronous I think you will realize you would need to pass
> all kinds of state around to get the read command callback to start the netty
> write. Vs observables which make that pipeline declarative. Also more
> advanced things like backpressure and message passing between N:M producers
> and consumers becomes complex. This isn't to say we can't [use
> both|http://www.nurkiewicz.com/2014/11/converting-between-completablefuture.html]
> if Observables are overkill.
> I hope this ticket sparks some good discussion!
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)