I hadn't put together that each compilation could have its own processor. Very cool. Thanks for the benchmarking numbers. I had a tp3 inspired JMH-based module in progress when I saw your results so I added the two test traversals in. It doesn't do any parameterization of input sizes at this point but if you're interested in checking it out I pushed it to the tp4-jmh branch: https://github.com/apache/tinkerpop/blob/tp4-jmh/java/machine/machine-perf-test/src/main/java/org/apache/tinkerpop/benchmark/util/AbstractTraversalBenchmarkBase.java .
The benchmarks can be run from the command line or from the IDE using the individual process tests: https://github.com/apache/tinkerpop/tree/tp4-jmh/java/machine/machine-perf-test/src/main/java/org/apache/tinkerpop/benchmark/machine . Here's the median times for input size = 1000 that I pulled from running. The full output includes the P90, P95, etc and it can also be set to dump an average or raw throughput: mvn clean test -DskipBenchmarks=false -Dforks=1 -DmeasureIterations=5 -DwarmupIterations=5 RxSerialTraversalBenchmark.g_inject_unfold_incr_incr_incr_incr:g_inject_unfold_incr_incr_incr_incr·p0.50 sample 6.988 ms/op RxParallelTraversalBenchmark.g_inject_unfold_incr_incr_incr_incr:g_inject_unfold_incr_incr_incr_incr·p0.50 sample 11.633 ms/op PipesTraversalBenchmark.g_inject_unfold_incr_incr_incr_incr:g_inject_unfold_incr_incr_incr_incr·p0.50 sample 6.627 ms/op RxSerialTraversalBenchmark.g_inject_unfold_repeat_times:g_inject_unfold_repeat_times·p0.50 sample 3.592 ms/op RxParallelTraversalBenchmark.g_inject_unfold_repeat_times:g_inject_unfold_repeat_times·p0.50 sample 7.897 ms/op PipesTraversalBenchmark.g_inject_unfold_repeat_times:g_inject_unfold_repeat_times·p0.50 sample 3.887 ms/op JMH is great, but the defaults will have it do a ton of timing runs which is slow so for quicker (but less accurate) runs the measureIterations and warmupIterations can be decreased. --Ted On Mon, Apr 8, 2019 at 12:16 PM Marko Rodriguez <[email protected]> wrote: > Hi, > > I implemented Multi-threaded RxJava this morning — its called > ParallelRxJava. Single-threaded is called SerialRxJava. > > The RxJavaProcessor factory will generate either depending on the Map.of() > configuration: > > > https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaProcessor.java#L49-L53 > < > https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaProcessor.java#L49-L53 > > > > You can see the source code for each RxJava implementation here: > > > https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java > < > https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java > > > > https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java > < > https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java > > > > Given Ted’s comments last week, I decided to create a micro-benchmark > @Test to compare SerialRxJava, ParallelRxJava, and Pipes. > > > https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaBenchmark.java > < > https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaBenchmark.java > > > > The results are below. My notes on the results are as follows: > > * ParallelRxJava used 7 threads. > * Times averaged over 30 runs (minus the first 2 runs — JVM > warmup). > * SerialRxJava and Pipes are very close on non-branching > traversals with a small input set. > * ParallelRxJava never beats Pipes, but does beat SerialRxJava on > large input sets. > * My implementation of repeat() in RxJava is not good. I need to > think of a better way to implement recursion (and branching in general). > * ParallelRxJava will probably shine when the query has a lot of > database operations (e.g. out(), inE(), addV(), etc.). > * There is a lot of intelligence to to add to ParallelRxJava — > e.g., > ** If the nested traversal is simple (only a few steps), > don’t thread. For example, there is no need to thread the is() of > choose(is(gt(3)),….). > ** One of the beautiful things about TP4 is that each > Compilation (nest) can have a different processor. > *** Thus, parallel for long sequences and serial > for short sequences…or, Pipes for short sequences! (Beam uses Pipes for > short sequences). > > ——— > > g.inject(input).unfold().incr().incr().incr().incr().iterate() > > Input size: 10 > Average time [seri]: 0.4 > Average time [para]: 2.4 > Average time [pipe]: 0.5 > > Input size: 100 > Average time [seri]: 0.96666664 > Average time [para]: 4.3333335 > Average time [pipe]: 0.8 > > Input size: 1000 > Average time [seri]: 2.5333333 > Average time [para]: 4.1666665 > Average time [pipe]: 1.7 > > Input size: 10000 > Average time [seri]: 12.1 > Average time [para]: 10.633333 > Average time [pipe]: 8.1 > > Input size: 100000 > Average time [seri]: 103.96667 > Average time [para]: 95.066666 > Average time [pipe]: 59.933334 > > —————— > —————— > > g.inject(input).unfold().repeat(incr()).times(4).iterate() > > Input size: 10 > Average time [seri]: 1.3333334 > Average time [para]: 4.8 > Average time [pipe]: 0.8333333 > > Input size: 100 > Average time [seri]: 2.9 > Average time [para]: 8.866667 > Average time [pipe]: 1.0333333 > > Input size: 1000 > Average time [seri]: 15.7 > Average time [para]: 22.066668 > Average time [pipe]: 3.4 > > Input size: 10000 > Average time [seri]: 50.4 > Average time [para]: 35.8 > Average time [pipe]: 8.566667 > > Input size: 100000 > Average time [seri]: 387.06668 > Average time [para]: 271.2 > Average time [pipe]: 60.566666 > > —————— > —————— > > One of the reasons for implementing a multi-threaded single machine > processor was to see how threading would work with the intermediate > CFunction representation. At first, I thought I was going to have to make > CFunctions thread safe (as they can nest and can contain Compilations), but > then I realized we provide a clone() method. Thus, its up to the processor > to clone CFunctions accordingly across threads (“rails” in RxJava). For > ParallelRxJava, its as simple as using ThreadLocal. Here is the MapFlow > ReactiveX Function: > > > https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/MapFlow.java > < > https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/MapFlow.java > > > > After using ThreadLocal in the map, flatmap, filter, and branch flows, > everything just worked! I don’t know if this is the best idea, but its > simple and it pushes multi-threaded query concerns to the processors as > opposed to the TP4 VM. Given the numerous ways in which threading could be > implemented, it seems that this shouldn’t be a TP4 VM concern. Thoughts? > > Thanks for reading, > Marko. > > http://rredux.com <http://rredux.com/> > > > > >
