On 2019/04/24 12:19:54, Marko Rodriguez <okramma...@gmail.com> wrote: 
> Hello,
> 
> > I think it would be better to either expose Flowable on the API (or Flow if 
> > you don't want to be tied in to RxJava)
> 
> We definitely don’t want to expose anything “provider specific.” Especially 
> at the Processor interface level. I note your Flow API reference in 
> java.concurrent and have noticed that RxJava mimics many java.concurrent 
> classes (Subscriber, Subscription, etc.). I will dig deeper.
> 
> > 1. Using Consumer will break the Rx chain. This is undesirable as it will 
> > prevent backpressure and cancellation from working properly.
> 
> Understood about breaking the chain.
> 
> > 2. The Scheduler to run the traversal on can be set. For instance, in the 
> > case where only certain threads are allowed to perform IO once the user has 
> > the Flowable they can call subscribeOn before subscribe.
> > 3. Backpressure strategy can be set, such as dropping results on buffer 
> > overflow.
> > 4. Buffer size can be set.
> 
> Hm. Here are my thoughts on the matter.
> 
> RxJava is just one of many Processors that will interact with TP4. If we 
> start exposing backpressure strategies, buffer sizes, etc. at the Processor 
> API level, then we expect other providers to have those concepts. Does Spark 
> support backpressure? Does Hadoop? Does Pipes? ...
> 
> I believe such provider-specific parameterization should happen via 
> language-agnostic configuration. For instance:
> 
> g = g.withProcessor(RxJavaProcessor.class, Map.of(“rxjava.backpressure”, 
> “drop”, “rxjava.bufferSize”, 2000))
> g.V().out().blah()
> 
> Unlike TP3, TP4 users will never interact with our Java API. They will never 
> have a reference to a Processor instance. They only talk to the TP4 VM via 
> Bytecode. However, with that said, systems that will integrate the TP4 VM 
> (e.g. database vendors, data server systems, etc.) will have to handle 
> Processor traverser results in some way (i.e. within Java). Thus, if they are 
> a Reactive architecture, then they will want to be able to Flow, but we need 
> to make sure that java.concurrent Flow semantics doesn't go too far in 
> demanding “unreasonable” behaviors from other Processor implementations. (I 
> need to study the java.concurrent Flow API)
> 
> Thus, I see it like this:
> 
>       1. RxJava specific configuration is not available at the Process API 
> level (only via configuration).
>       2. Drop Consumer and expose java.concurrent Flow in Processor so the 
> chain isn’t broken for systems integrating the TP4 VM.
>               - predicated on java.concurrent Flow having reasonable 
> expectations of non-reactive sources (i.e. processors).
> 
> Does this make sense to you?
> 
> ———
> 
> Stephen said you made a comment regarding ParallelRxJava as not being 
> necessary. If this is a true statement, can you explain your thoughts on 
> ParallelRxJava. My assumptions regarding serial vs. parallel:
> 
>       1. For TP4 VM vendors in a highly concurrent, multi-user environment, 
> multi-threading individual queries is bad.
>       2. For TP4 VM vendors in a lowly concurrent, limited-user environment, 
> multi-threading a single query is good.
>               - also related to the workload — e.g. ParallelRxJava for an AI 
> system where one query at a time is happening over lots of data.
> 
> Thank you for your feedback,
> Marko.
> 
> http://rredux.com <http://rredux.com/>
> 
> 
> 
> 
> > On Apr 24, 2019, at 3:41 AM, brynco...@gmail.com wrote:
> > 
> > 
> > 
> > On 2019/04/23 13:07:09, Marko Rodriguez <okramma...@gmail.com 
> > <mailto:okramma...@gmail.com>> wrote: 
> >> Hi,
> >> 
> >> Stephen and Bryn were looking over my RxJava implementation the other day 
> >> and Bryn, with his British accent, was like [I paraphrase]:
> >> 
> >>    “Whoa dawg! Bro should like totally not be blocking to fill an 
> >> iterator. Gnar gnar for surezies.”
> >> 
> >> Prior to now, Processor implemented Iterator<Traverser>, where for RxJava, 
> >> when you do next()/hasNext() if there were no results in the queue and the 
> >> flowable was still running, then the iterator while()-blocks waiting for a 
> >> result or for the flowable to terminate.
> >> 
> >> This morning I decided to redo the Processor interface (and respective 
> >> implementations) and it is much nicer now. We have two “execute” methods:
> >> 
> >> Iterator<Traverser>        Processor.iterator(Iterator<Traverser> starts)
> >> void Processor.subscribe(Iterator<Traverser> starts, Consumer<Traverser> 
> >> consumer)
> >> 
> >> A processor can only be executed using one of the methods above. Thus, 
> >> depending on context and the underlying processor, the VM determines 
> >> whether to use pull-based or push-based semantics. Pretty neat, eh?
> >> 
> >>    
> >> https://github.com/apache/tinkerpop/blob/tp4/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/processor/Processor.java
> >>  
> >> <https://github.com/apache/tinkerpop/blob/tp4/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/processor/Processor.java>
> >>  
> >> <https://github.com/apache/tinkerpop/blob/tp4/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/processor/Processor.java
> >>  
> >> <https://github.com/apache/tinkerpop/blob/tp4/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/processor/Processor.java>>
> >> 
> >> Check out how I do Pipes:
> >> 
> >>    
> >> https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/Pipes.java#L113-L126
> >>  
> >> <https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/Pipes.java#L113-L126><https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/Pipes.java#L113-L126
> >>  
> >> <https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/Pipes.java#L113-L126>>
> >> 
> >> Pipes is inherently pull-based. However, to simulate push-based semantics, 
> >> I Thread().start() the iterator.hasNext()/next() and just consume.accept() 
> >> the results. Thus, as desired, subscribe() returns immediately.
> >> 
> >> Next, here is my RxJava implementation.
> >> 
> >>    
> >> https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java#L59-L65
> >>  
> >> <https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java#L59-L65><https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java#L59-L65
> >>  
> >> <https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java#L59-L65>>
> >>    
> >> https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java#L66-L86
> >>  
> >> <https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java#L66-L86><https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java#L66-L86
> >>  
> >> <https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java#L66-L86>>
> >> 
> >> You can see how I turn a push-based subscription into a pull-based 
> >> iteration using the good ‘ol while()-block :).
> >> 
> >>    
> >> https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java#L98-L102
> >>  
> >> <https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java#L98-L102><https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java#L98-L102
> >>  
> >> <https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java#L98-L102>>
> >> 
> >> ——
> >> 
> >> What I need to do next is to redo the RxJava execution planner such that 
> >> nested traversals (e.g. map(out()))) are subscription-based with the 
> >> parent flowable. I don’t quite know how I will do it — but I believe I 
> >> will have to write custom Publisher/Subscriber objects for use with 
> >> Flowable.compose() such that onNext() and onComplete() will be called 
> >> accordingly within the consumer.accept(). It will be tricky as I’m not too 
> >> good with low-level RxJava, but thems the breaks.
> >> 
> >> Please note that my push-based conceptual skills are not the sharpest so 
> >> if anyone has any recommendations, please advise.
> >> 
> >> Take care,
> >> Marko.
> >> 
> >> http://rredux.com <http://rredux.com/> <http://rredux.com/ 
> >> <http://rredux.com/>>
> >> 
> >> 
> >> 
> >> 
> >> 
> > 
> 
> 

Perhaps exposing Flow on the Processor API is enough. As long as backpressure 
and cancellation work the rest of the config is an internal implementation 
detail of the concrete RxJavaProcessor implementation. I would not expose 
buffer size etc to end users.

Check out https://github.com/akarnokd/RxJava2Jdk9Interop

On a related note I would shy away from using properties to configure much as 
this was a general pain point for us in TP3.

Regarding ParallelFlowable not being necessary, remember that flatMap gives you 
parallelism.
https://www.nurkiewicz.com/2017/09/idiomatic-concurrency-flatmap-vs.html

>From a database point of view you will always be doing IO on a flatMap rather 
>than map. So using parallel could actually make things slower for the 
>traversal as it will be introducing context switches for low cost non-blocking 
>operations.

Bryn



Reply via email to