Hi,
I implemented Multi-threaded RxJava this morning — its called ParallelRxJava.
Single-threaded is called SerialRxJava.
The RxJavaProcessor factory will generate either depending on the Map.of()
configuration:
https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaProcessor.java#L49-L53
<https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaProcessor.java#L49-L53>
You can see the source code for each RxJava implementation here:
https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
<https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java>
https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
<https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java>
Given Ted’s comments last week, I decided to create a micro-benchmark @Test to
compare SerialRxJava, ParallelRxJava, and Pipes.
https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaBenchmark.java
<https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaBenchmark.java>
The results are below. My notes on the results are as follows:
* ParallelRxJava used 7 threads.
* Times averaged over 30 runs (minus the first 2 runs — JVM warmup).
* SerialRxJava and Pipes are very close on non-branching traversals
with a small input set.
* ParallelRxJava never beats Pipes, but does beat SerialRxJava on large
input sets.
* My implementation of repeat() in RxJava is not good. I need to think
of a better way to implement recursion (and branching in general).
* ParallelRxJava will probably shine when the query has a lot of
database operations (e.g. out(), inE(), addV(), etc.).
* There is a lot of intelligence to to add to ParallelRxJava — e.g.,
** If the nested traversal is simple (only a few steps), don’t
thread. For example, there is no need to thread the is() of
choose(is(gt(3)),….).
** One of the beautiful things about TP4 is that each
Compilation (nest) can have a different processor.
*** Thus, parallel for long sequences and serial for
short sequences…or, Pipes for short sequences! (Beam uses Pipes for short
sequences).
———
g.inject(input).unfold().incr().incr().incr().incr().iterate()
Input size: 10
Average time [seri]: 0.4
Average time [para]: 2.4
Average time [pipe]: 0.5
Input size: 100
Average time [seri]: 0.96666664
Average time [para]: 4.3333335
Average time [pipe]: 0.8
Input size: 1000
Average time [seri]: 2.5333333
Average time [para]: 4.1666665
Average time [pipe]: 1.7
Input size: 10000
Average time [seri]: 12.1
Average time [para]: 10.633333
Average time [pipe]: 8.1
Input size: 100000
Average time [seri]: 103.96667
Average time [para]: 95.066666
Average time [pipe]: 59.933334
——————
——————
g.inject(input).unfold().repeat(incr()).times(4).iterate()
Input size: 10
Average time [seri]: 1.3333334
Average time [para]: 4.8
Average time [pipe]: 0.8333333
Input size: 100
Average time [seri]: 2.9
Average time [para]: 8.866667
Average time [pipe]: 1.0333333
Input size: 1000
Average time [seri]: 15.7
Average time [para]: 22.066668
Average time [pipe]: 3.4
Input size: 10000
Average time [seri]: 50.4
Average time [para]: 35.8
Average time [pipe]: 8.566667
Input size: 100000
Average time [seri]: 387.06668
Average time [para]: 271.2
Average time [pipe]: 60.566666
——————
——————
One of the reasons for implementing a multi-threaded single machine processor
was to see how threading would work with the intermediate CFunction
representation. At first, I thought I was going to have to make CFunctions
thread safe (as they can nest and can contain Compilations), but then I
realized we provide a clone() method. Thus, its up to the processor to clone
CFunctions accordingly across threads (“rails” in RxJava). For ParallelRxJava,
its as simple as using ThreadLocal. Here is the MapFlow ReactiveX Function:
https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/MapFlow.java
<https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/MapFlow.java>
After using ThreadLocal in the map, flatmap, filter, and branch flows,
everything just worked! I don’t know if this is the best idea, but its simple
and it pushes multi-threaded query concerns to the processors as opposed to the
TP4 VM. Given the numerous ways in which threading could be implemented, it
seems that this shouldn’t be a TP4 VM concern. Thoughts?
Thanks for reading,
Marko.
http://rredux.com <http://rredux.com/>