Hi,

Stephen and Bryn were looking over my RxJava implementation the other day and 
Bryn, with his British accent, was like [I paraphrase]:

        “Whoa dawg! Bro should like totally not be blocking to fill an 
iterator. Gnar gnar for surezies.”

Prior to now, Processor implemented Iterator<Traverser>, where for RxJava, when 
you do next()/hasNext() if there were no results in the queue and the flowable 
was still running, then the iterator while()-blocks waiting for a result or for 
the flowable to terminate.

This morning I decided to redo the Processor interface (and respective 
implementations) and it is much nicer now. We have two “execute” methods:

Iterator<Traverser>     Processor.iterator(Iterator<Traverser> starts)
void Processor.subscribe(Iterator<Traverser> starts, Consumer<Traverser> 
consumer)

A processor can only be executed using one of the methods above. Thus, 
depending on context and the underlying processor, the VM determines whether to 
use pull-based or push-based semantics. Pretty neat, eh?

        
https://github.com/apache/tinkerpop/blob/tp4/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/processor/Processor.java
 
<https://github.com/apache/tinkerpop/blob/tp4/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/processor/Processor.java>

Check out how I do Pipes:

        
https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/Pipes.java#L113-L126
 
<https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/Pipes.java#L113-L126>

Pipes is inherently pull-based. However, to simulate push-based semantics, I 
Thread().start() the iterator.hasNext()/next() and just consume.accept() the 
results. Thus, as desired, subscribe() returns immediately.

Next, here is my RxJava implementation.

        
https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java#L59-L65
 
<https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java#L59-L65>
        
https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java#L66-L86
 
<https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java#L66-L86>

You can see how I turn a push-based subscription into a pull-based iteration 
using the good ‘ol while()-block :).

        
https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java#L98-L102
 
<https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java#L98-L102>

——

What I need to do next is to redo the RxJava execution planner such that nested 
traversals (e.g. map(out()))) are subscription-based with the parent flowable. 
I don’t quite know how I will do it — but I believe I will have to write custom 
Publisher/Subscriber objects for use with Flowable.compose() such that onNext() 
and onComplete() will be called accordingly within the consumer.accept(). It 
will be tricky as I’m not too good with low-level RxJava, but thems the breaks.

Please note that my push-based conceptual skills are not the sharpest so if 
anyone has any recommendations, please advise.

Take care,
Marko.

http://rredux.com <http://rredux.com/>




Reply via email to