Hi, Stephen and Bryn were looking over my RxJava implementation the other day and Bryn, with his British accent, was like [I paraphrase]:
“Whoa dawg! Bro should like totally not be blocking to fill an iterator. Gnar gnar for surezies.” Prior to now, Processor implemented Iterator<Traverser>, where for RxJava, when you do next()/hasNext() if there were no results in the queue and the flowable was still running, then the iterator while()-blocks waiting for a result or for the flowable to terminate. This morning I decided to redo the Processor interface (and respective implementations) and it is much nicer now. We have two “execute” methods: Iterator<Traverser> Processor.iterator(Iterator<Traverser> starts) void Processor.subscribe(Iterator<Traverser> starts, Consumer<Traverser> consumer) A processor can only be executed using one of the methods above. Thus, depending on context and the underlying processor, the VM determines whether to use pull-based or push-based semantics. Pretty neat, eh? https://github.com/apache/tinkerpop/blob/tp4/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/processor/Processor.java <https://github.com/apache/tinkerpop/blob/tp4/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/processor/Processor.java> Check out how I do Pipes: https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/Pipes.java#L113-L126 <https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/Pipes.java#L113-L126> Pipes is inherently pull-based. However, to simulate push-based semantics, I Thread().start() the iterator.hasNext()/next() and just consume.accept() the results. Thus, as desired, subscribe() returns immediately. Next, here is my RxJava implementation. https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java#L59-L65 <https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java#L59-L65> https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java#L66-L86 <https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java#L66-L86> You can see how I turn a push-based subscription into a pull-based iteration using the good ‘ol while()-block :). https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java#L98-L102 <https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java#L98-L102> —— What I need to do next is to redo the RxJava execution planner such that nested traversals (e.g. map(out()))) are subscription-based with the parent flowable. I don’t quite know how I will do it — but I believe I will have to write custom Publisher/Subscriber objects for use with Flowable.compose() such that onNext() and onComplete() will be called accordingly within the consumer.accept(). It will be tricky as I’m not too good with low-level RxJava, but thems the breaks. Please note that my push-based conceptual skills are not the sharpest so if anyone has any recommendations, please advise. Take care, Marko. http://rredux.com <http://rredux.com/>