This is an automated email from the ASF dual-hosted git repository.

okram pushed a commit to branch tp4
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git


The following commit(s) were added to refs/heads/tp4 by this push:
     new 467c543  first working version of parallel RxJava. This implementation 
provides the pattern for dealing with threading. Functions that maintain 
compilations (nested traversals), need to be ThreadLocal so that the 
compilation object isn't interacted with multiple threads. I thought I was 
going to have to ThreadLocal throughout machine-core, but thankfully, no. It 
makes sense -- if your processors 'map operator' is threaded, then the TP map 
function it wraps needs to ensure thread- [...]
467c543 is described below

commit 467c5439a19c23c2aa9d88bab973fd421556d16c
Author: Marko A. Rodriguez <okramma...@gmail.com>
AuthorDate: Mon Apr 8 07:46:24 2019 -0600

    first working version of parallel RxJava. This implementation provides the 
pattern for dealing with threading. Functions that maintain compilations 
(nested traversals), need to be ThreadLocal so that the compilation object 
isn't interacted with multiple threads. I thought I was going to have to 
ThreadLocal throughout machine-core, but thankfully, no. It makes sense -- if 
your processors 'map operator' is threaded, then the TP map function it wraps 
needs to ensure thread-safe use of th [...]
---
 .../machine/function/branch/BranchBranch.java      |   2 +-
 .../tinkerpop/machine/processor/pipes/Pipes.java   |  22 ++--
 .../machine/processor/rxjava/AbstractRxJava.java   |  70 +++++++++++
 .../machine/processor/rxjava/BranchFlow.java       |  14 ++-
 .../machine/processor/rxjava/FilterFlow.java       |   6 +-
 .../machine/processor/rxjava/MapFlow.java          |   6 +-
 .../machine/processor/rxjava/ParallelRxJava.java   | 134 +++++++++++++++++++++
 .../machine/processor/rxjava/RepeatEnd.java        |  64 ++++++++++
 .../machine/processor/rxjava/RepeatStart.java      |  63 ++++++++++
 .../machine/processor/rxjava/RxJavaProcessor.java  |  19 ++-
 .../rxjava/{RxJava.java => SerialRxJava.java}      | 125 +++----------------
 .../processor/rxjava/strategy/RxJavaStrategy.java  |   4 +-
 ...LocalTest.java => SimpleLocalParallelTest.java} |   8 +-
 ...leLocalTest.java => SimpleLocalSerialTest.java} |   4 +-
 ...RemoteTest.java => SimpleRemoteSerialTest.java} |   4 +-
 15 files changed, 408 insertions(+), 137 deletions(-)

diff --git 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/branch/BranchBranch.java
 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/branch/BranchBranch.java
index 165a87d..b548b8c 100644
--- 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/branch/BranchBranch.java
+++ 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/branch/BranchBranch.java
@@ -62,7 +62,7 @@ public final class BranchBranch<C, S, E> extends 
AbstractFunction<C> implements
             for (final Compilation<C, S, E> compilation : entry.getValue()) {
                 compilations.add(compilation.clone());
             }
-            clone.branches.put(entry.getKey().clone(), compilations);
+            clone.branches.put(null == entry.getKey() ? null : 
entry.getKey().clone(), compilations);
         }
         return clone;
     }
diff --git 
a/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/Pipes.java
 
b/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/Pipes.java
index 9ef86e1..e370c58 100644
--- 
a/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/Pipes.java
+++ 
b/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/Pipes.java
@@ -28,8 +28,8 @@ import org.apache.tinkerpop.machine.function.InitialFunction;
 import org.apache.tinkerpop.machine.function.MapFunction;
 import org.apache.tinkerpop.machine.function.ReduceFunction;
 import org.apache.tinkerpop.machine.function.branch.RepeatBranch;
-import org.apache.tinkerpop.machine.processor.pipes.util.InMemoryReducer;
 import org.apache.tinkerpop.machine.processor.Processor;
+import org.apache.tinkerpop.machine.processor.pipes.util.InMemoryReducer;
 import org.apache.tinkerpop.machine.traverser.Traverser;
 
 import java.util.ArrayList;
@@ -45,7 +45,7 @@ public final class Pipes<C, S, E> implements Processor<C, S, 
E> {
     private SourceStep<C, S> startStep;
 
     public Pipes(final Compilation<C, S, E> compilation) {
-        Step<C, ?, ?> previousStep = EmptyStep.instance();
+        Step<C, ?, S> previousStep = EmptyStep.instance();
         for (final CFunction<?> function : compilation.getFunctions()) {
             final Step nextStep;
             if (this.steps.isEmpty() && !(function instanceof 
InitialFunction)) {
@@ -55,22 +55,22 @@ public final class Pipes<C, S, E> implements Processor<C, 
S, E> {
             }
 
             if (function instanceof RepeatBranch)
-                nextStep = new RepeatStep(previousStep, (RepeatBranch<C, ?>) 
function);
+                nextStep = new RepeatStep<>(previousStep, (RepeatBranch<C, S>) 
function);
             else if (function instanceof BranchFunction)
-                nextStep = new BranchStep(previousStep, (BranchFunction<C, ?, 
?>) function);
+                nextStep = new BranchStep<>(previousStep, (BranchFunction<C, 
S, E>) function);
             else if (function instanceof FilterFunction)
-                nextStep = new FilterStep(previousStep, (FilterFunction<C, ?>) 
function);
+                nextStep = new FilterStep<>(previousStep, (FilterFunction<C, 
S>) function);
             else if (function instanceof FlatMapFunction)
-                nextStep = new FlatMapStep(previousStep, (FlatMapFunction<C, 
?, ?>) function);
+                nextStep = new FlatMapStep<>(previousStep, (FlatMapFunction<C, 
S, E>) function);
             else if (function instanceof MapFunction)
-                nextStep = new MapStep(previousStep, (MapFunction<C, ?, ?>) 
function);
+                nextStep = new MapStep<>(previousStep, (MapFunction<C, S, E>) 
function);
             else if (function instanceof InitialFunction)
-                nextStep = new InitialStep((InitialFunction<C, S>) function, 
compilation.getTraverserFactory());
+                nextStep = new InitialStep<>((InitialFunction<C, S>) function, 
compilation.getTraverserFactory());
             else if (function instanceof BarrierFunction)
-                nextStep = new BarrierStep(previousStep, (BarrierFunction) 
function, compilation.getTraverserFactory());
+                nextStep = new BarrierStep<>(previousStep, (BarrierFunction<C, 
S, E, Object>) function, compilation.getTraverserFactory());
             else if (function instanceof ReduceFunction)
-                nextStep = new ReduceStep(previousStep, (ReduceFunction<C, ?, 
?>) function,
-                        new InMemoryReducer((ReduceFunction<C, ?, ?>) 
function), compilation.getTraverserFactory());
+                nextStep = new ReduceStep<>(previousStep, (ReduceFunction<C, 
S, E>) function,
+                        new InMemoryReducer<>((ReduceFunction<C, S, E>) 
function), compilation.getTraverserFactory());
             else
                 throw new RuntimeException("You need a new step type:" + 
function);
 
diff --git 
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java
 
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java
new file mode 100644
index 0000000..9d05fd2
--- /dev/null
+++ 
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.machine.processor.rxjava;
+
+import org.apache.tinkerpop.machine.bytecode.compiler.Compilation;
+import org.apache.tinkerpop.machine.processor.Processor;
+import org.apache.tinkerpop.machine.traverser.Traverser;
+import org.apache.tinkerpop.machine.traverser.TraverserSet;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public abstract class AbstractRxJava<C, S, E> implements Processor<C, S, E> {
+
+    static final int MAX_REPETITIONS = 8; // TODO: this needs to be a dynamic 
configuration
+
+    final AtomicBoolean alive = new AtomicBoolean(Boolean.TRUE);
+    boolean executed = false;
+    final TraverserSet<C, S> starts = new TraverserSet<>();
+    final TraverserSet<C, E> ends = new TraverserSet<>();
+    final Compilation<C, S, E> compilation;
+
+    AbstractRxJava(final Compilation<C, S, E> compilation) {
+        this.compilation = compilation;
+    }
+
+    @Override
+    public void addStart(final Traverser<C, S> traverser) {
+        this.starts.add(traverser);
+    }
+
+    @Override
+    public Traverser<C, E> next() {
+        this.prepareFlow();
+        return this.ends.remove();
+    }
+
+    @Override
+    public boolean hasNext() {
+        this.prepareFlow();
+        return !this.ends.isEmpty();
+    }
+
+    @Override
+    public void reset() {
+        this.starts.clear();
+        this.ends.clear();
+        this.executed = false;
+    }
+
+    protected abstract void prepareFlow();
+}
\ No newline at end of file
diff --git 
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/BranchFlow.java
 
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/BranchFlow.java
index ee3cebc..490b21c 100644
--- 
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/BranchFlow.java
+++ 
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/BranchFlow.java
@@ -31,16 +31,22 @@ import java.util.List;
  */
 final class BranchFlow<C, S> implements Function<Traverser<C, S>, List> {
 
-    private final List<Compilation<C, S, ?>> selectors;
+    private final ThreadLocal<List<Compilation<C, S, ?>>> selectors;
 
     BranchFlow(final BranchFunction<C, S, ?> function) {
-        this.selectors = new ArrayList<>(function.getBranches().keySet());
+        this.selectors = ThreadLocal.withInitial(() -> {
+            final List<Compilation<C, S, ?>> branches = new ArrayList<>();
+            for (final Compilation<C, S, ?> branch : 
function.getBranches().keySet()) {
+                branches.add(null == branch ? null : branch.clone());
+            }
+            return branches;
+        });
     }
 
     @Override
     public List apply(final Traverser<C, S> traverser) {
-        for (int i = 0; i < this.selectors.size(); i++) {
-            final Compilation<C, S, ?> selector = this.selectors.get(i);
+        for (int i = 0; i < this.selectors.get().size(); i++) {
+            final Compilation<C, S, ?> selector = this.selectors.get().get(i);
             if (null != selector) {
                 if (selector.filterTraverser(traverser))
                     return List.of(i, traverser);
diff --git 
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/FilterFlow.java
 
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/FilterFlow.java
index 1ca66db..ec686f7 100644
--- 
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/FilterFlow.java
+++ 
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/FilterFlow.java
@@ -27,14 +27,14 @@ import org.apache.tinkerpop.machine.traverser.Traverser;
  */
 final class FilterFlow<C, S> implements Predicate<Traverser<C, S>> {
 
-    private final FilterFunction<C, S> function;
+    private final ThreadLocal<FilterFunction<C, S>> function;
 
     FilterFlow(final FilterFunction<C, S> function) {
-        this.function = function;
+        this.function = ThreadLocal.withInitial(() -> (FilterFunction) 
function.clone());
     }
 
     @Override
     public boolean test(final Traverser<C, S> traverser) {
-        return this.function.test(traverser); // todo: make this 0/1-flatmap 
so traverser splitting is correct
+        return this.function.get().test(traverser); // todo: make this 
0/1-flatmap so traverser splitting is correct
     }
 }
diff --git 
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/MapFlow.java
 
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/MapFlow.java
index 4ea3691..57e934e 100644
--- 
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/MapFlow.java
+++ 
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/MapFlow.java
@@ -27,14 +27,14 @@ import org.apache.tinkerpop.machine.traverser.Traverser;
  */
 final class MapFlow<C, S, E> implements Function<Traverser<C, S>, Traverser<C, 
E>> {
 
-    private final MapFunction<C, S, E> function;
+    private final ThreadLocal<MapFunction<C, S, E>> function;
 
     MapFlow(final MapFunction<C, S, E> function) {
-        this.function = function;
+        this.function = ThreadLocal.withInitial(() -> 
(MapFunction)function.clone());
     }
 
     @Override
     public Traverser<C, E> apply(final Traverser<C, S> traverser) {
-        return traverser.map(this.function);
+        return traverser.map(this.function.get());
     }
 }
diff --git 
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
 
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
new file mode 100644
index 0000000..a55ae4c
--- /dev/null
+++ 
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.machine.processor.rxjava;
+
+import io.reactivex.Flowable;
+import io.reactivex.parallel.ParallelFlowable;
+import io.reactivex.processors.PublishProcessor;
+import io.reactivex.schedulers.Schedulers;
+import org.apache.tinkerpop.machine.bytecode.compiler.Compilation;
+import org.apache.tinkerpop.machine.function.BarrierFunction;
+import org.apache.tinkerpop.machine.function.BranchFunction;
+import org.apache.tinkerpop.machine.function.CFunction;
+import org.apache.tinkerpop.machine.function.FilterFunction;
+import org.apache.tinkerpop.machine.function.FlatMapFunction;
+import org.apache.tinkerpop.machine.function.InitialFunction;
+import org.apache.tinkerpop.machine.function.MapFunction;
+import org.apache.tinkerpop.machine.function.ReduceFunction;
+import org.apache.tinkerpop.machine.function.branch.RepeatBranch;
+import org.apache.tinkerpop.machine.traverser.Traverser;
+import org.apache.tinkerpop.machine.traverser.TraverserFactory;
+import org.apache.tinkerpop.machine.util.IteratorUtils;
+import org.reactivestreams.Publisher;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class ParallelRxJava<C, S, E> extends AbstractRxJava<C, S, E> {
+
+    private final int threads;
+
+    ParallelRxJava(final Compilation<C, S, E> compilation, final int threads) {
+        super(compilation);
+        this.threads = threads;
+    }
+
+    @Override
+    protected void prepareFlow() {
+        if (!this.executed) {
+            ExecutorService threadPool = 
Executors.newFixedThreadPool(this.threads);
+            this.executed = true;
+            
ParallelRxJava.compile(ParallelFlowable.from(Flowable.fromIterable(this.starts)).runOn(Schedulers.from(threadPool)),
 this.compilation).
+                    doOnNext(this.ends::add).
+                    sequential().
+                    doOnComplete(() -> this.alive.set(Boolean.FALSE)).
+                    doFinally(threadPool::shutdown).
+                    blockingSubscribe();
+        }
+    }
+
+    // EXECUTION PLAN COMPILER
+
+    private static <C, S, E> ParallelFlowable<Traverser<C, E>> compile(final 
ParallelFlowable<Traverser<C, S>> source, final Compilation<C, S, E> 
compilation) {
+        final TraverserFactory<C> traverserFactory = 
compilation.getTraverserFactory();
+        ParallelFlowable<Traverser<C, E>> sink = (ParallelFlowable) source;
+        for (final CFunction<C> function : compilation.getFunctions()) {
+            sink = ParallelRxJava.extend(sink, function, traverserFactory);
+        }
+        return sink;
+    }
+
+    private static <C, S, E, B> ParallelFlowable<Traverser<C, E>> 
extend(ParallelFlowable<Traverser<C, S>> flow, final CFunction<C> function, 
final TraverserFactory<C> traverserFactory) {
+        if (function instanceof MapFunction)
+            return flow.map(new MapFlow<>((MapFunction<C, S, E>) function));
+        else if (function instanceof FilterFunction) {
+            return (ParallelFlowable) flow.filter(new 
FilterFlow<>((FilterFunction<C, S>) function));
+        } else if (function instanceof FlatMapFunction) {
+            return flow.sequential().flatMapIterable(new 
FlatMapFlow<>((FlatMapFunction<C, S, E>) function)).parallel();
+        } else if (function instanceof InitialFunction) {
+            return Flowable.fromIterable(() -> 
IteratorUtils.map(((InitialFunction<C, E>) function).get(), s -> 
traverserFactory.create(function, s))).parallel();
+        } else if (function instanceof ReduceFunction) {
+            final ReduceFunction<C, S, E> reduceFunction = (ReduceFunction<C, 
S, E>) function;
+            return 
flow.sequential().reduce(traverserFactory.create(reduceFunction, 
reduceFunction.getInitialValue()), new 
Reducer<>(reduceFunction)).toFlowable().parallel();
+        } else if (function instanceof BarrierFunction) {
+            final BarrierFunction<C, S, E, B> barrierFunction = 
(BarrierFunction<C, S, E, B>) function;
+            return flow.sequential().reduce(barrierFunction.getInitialValue(), 
new Barrier<>(barrierFunction)).toFlowable().flatMapIterable(new 
BarrierFlow<>(barrierFunction, traverserFactory)).parallel();
+        } else if (function instanceof BranchFunction) {
+            final ParallelFlowable<List> selectorFlow = flow.map(new 
BranchFlow<>((BranchFunction<C, S, B>) function));
+            final List<Publisher<Traverser<C, E>>> branchFlows = new 
ArrayList<>();
+            int branchCounter = 0;
+            for (final Map.Entry<Compilation<C, S, ?>, List<Compilation<C, S, 
E>>> branches : ((BranchFunction<C, S, E>) function).getBranches().entrySet()) {
+                final int branchId = null == branches.getKey() ? -1 : 
branchCounter;
+                branchCounter++;
+                for (final Compilation<C, S, E> branch : branches.getValue()) {
+                    branchFlows.add(compile(selectorFlow.
+                                    filter(list -> 
list.get(0).equals(branchId)).
+                                    map(list -> (Traverser<C, S>) list.get(1)),
+                            branch).sequential());
+                }
+            }
+            return PublishProcessor.merge(branchFlows).parallel();
+        } else if (function instanceof RepeatBranch) {
+            final RepeatBranch<C, S> repeatBranch = (RepeatBranch<C, S>) 
function;
+            final List<Publisher<Traverser<C, S>>> outputs = new ArrayList<>();
+            ParallelFlowable<List> selectorFlow;
+            for (int i = 0; i < MAX_REPETITIONS; i++) {
+                if (repeatBranch.hasStartPredicates()) {
+                    selectorFlow = flow.sequential().flatMapIterable(new 
RepeatStart<>(repeatBranch)).parallel();
+                    outputs.add(selectorFlow.filter(list -> 
list.get(0).equals(0)).map(list -> (Traverser<C, S>) list.get(1)).sequential());
+                    flow = compile(selectorFlow.filter(list -> 
list.get(0).equals(1)).map(list -> (Traverser<C, S>) list.get(1)), 
repeatBranch.getRepeat());
+                } else {
+                    flow = compile(flow, repeatBranch.getRepeat());
+                }
+                selectorFlow = flow.sequential().flatMapIterable(new 
RepeatEnd<>(repeatBranch)).parallel();
+                outputs.add(selectorFlow.sequential().filter(list -> 
list.get(0).equals(0)).map(list -> (Traverser<C, S>) list.get(1)));
+                flow = selectorFlow.filter(list -> 
list.get(0).equals(1)).map(list -> (Traverser<C, S>) list.get(1));
+            }
+
+            return (ParallelFlowable) 
PublishProcessor.merge(outputs).parallel();
+        }
+        throw new RuntimeException("Need a new execution plan step: " + 
function);
+    }
+}
diff --git 
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RepeatEnd.java
 
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RepeatEnd.java
new file mode 100644
index 0000000..9086f32
--- /dev/null
+++ 
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RepeatEnd.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.machine.processor.rxjava;
+
+import io.reactivex.functions.Function;
+import org.apache.tinkerpop.machine.function.branch.RepeatBranch;
+import org.apache.tinkerpop.machine.traverser.Traverser;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class RepeatEnd<C, S> implements Function<Traverser<C, S>, 
List<List>> {
+
+    private final RepeatBranch<C, S> repeatBranch;
+
+    RepeatEnd(final RepeatBranch<C, S> repeatBranch) {
+        this.repeatBranch = repeatBranch;
+    }
+
+    @Override
+    public List<List> apply(final Traverser<C, S> traverser) {
+        final Traverser<C,S> t = traverser.repeatLoop(this.repeatBranch);
+        final List<List> list = new ArrayList<>();
+        if (this.repeatBranch.hasEndPredicates()) {
+            if (3 == this.repeatBranch.getUntilLocation()) {
+                if (this.repeatBranch.getUntil().filterTraverser(t)) {
+                    list.add(List.of(0, t.repeatDone(this.repeatBranch)));
+                } else if (4 == this.repeatBranch.getEmitLocation() && 
this.repeatBranch.getEmit().filterTraverser(t)) {
+                    list.add(List.of(0, t.repeatDone(this.repeatBranch)));
+                    list.add(List.of(1, t));
+                } else
+                    list.add(List.of(1, t));
+            } else if (3 == this.repeatBranch.getEmitLocation()) {
+                if (this.repeatBranch.getEmit().filterTraverser(t))
+                    list.add(List.of(0, t.repeatDone(this.repeatBranch)));
+                if (4 == this.repeatBranch.getUntilLocation() && 
this.repeatBranch.getUntil().filterTraverser(t))
+                    list.add(List.of(0, t.repeatDone(this.repeatBranch)));
+                else
+                    list.add(List.of(1, t));
+            }
+        } else
+            list.add(List.of(1, t));
+        return list;
+    }
+}
diff --git 
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RepeatStart.java
 
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RepeatStart.java
new file mode 100644
index 0000000..5dea785
--- /dev/null
+++ 
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RepeatStart.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.machine.processor.rxjava;
+
+import io.reactivex.functions.Function;
+import org.apache.tinkerpop.machine.function.branch.RepeatBranch;
+import org.apache.tinkerpop.machine.traverser.Traverser;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class RepeatStart<C, S> implements Function<Traverser<C, S>, 
List<List>> {
+
+    private final RepeatBranch<C, S> repeatBranch;
+
+    RepeatStart(final RepeatBranch<C, S> repeatBranch) {
+        this.repeatBranch = repeatBranch;
+    }
+
+    @Override
+    public List<List> apply(final Traverser<C, S> traverser) {
+        final List<List> list = new ArrayList<>();
+        if (this.repeatBranch.hasStartPredicates()) {
+            if (1 == this.repeatBranch.getUntilLocation()) {
+                if (this.repeatBranch.getUntil().filterTraverser(traverser)) {
+                    list.add(List.of(0, 
traverser.repeatDone(this.repeatBranch)));
+                } else if (2 == this.repeatBranch.getEmitLocation() && 
this.repeatBranch.getEmit().filterTraverser(traverser)) {
+                    list.add(List.of(1, traverser));
+                    list.add(List.of(0, 
traverser.repeatDone(this.repeatBranch)));
+                } else
+                    list.add(List.of(1, traverser));
+            } else if (1 == this.repeatBranch.getEmitLocation()) {
+                if (this.repeatBranch.getEmit().filterTraverser(traverser))
+                    list.add(List.of(0, 
traverser.repeatDone(this.repeatBranch)));
+                if (2 == this.repeatBranch.getUntilLocation() && 
this.repeatBranch.getUntil().filterTraverser(traverser)) {
+                    list.add(List.of(0, 
traverser.repeatDone(this.repeatBranch)));
+                } else
+                    list.add(List.of(1, traverser));
+            }
+        } else
+            list.add(List.of(1, traverser));
+        return list;
+    }
+}
diff --git 
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaProcessor.java
 
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaProcessor.java
index a57954e..b940e51 100644
--- 
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaProcessor.java
+++ 
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaProcessor.java
@@ -24,15 +24,32 @@ import 
org.apache.tinkerpop.machine.processor.ProcessorFactory;
 import org.apache.tinkerpop.machine.processor.rxjava.strategy.RxJavaStrategy;
 import org.apache.tinkerpop.machine.strategy.Strategy;
 
+import java.util.Collections;
+import java.util.Map;
 import java.util.Set;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
 public final class RxJavaProcessor implements ProcessorFactory {
+
+    public static final String RXJAVA_THREADS = "rxjava.threads";
+
+    private final Map<String, Object> configuration;
+
+    public RxJavaProcessor() {
+        this.configuration = Collections.emptyMap();
+    }
+
+    public RxJavaProcessor(final Map<String, Object> configuration) {
+        this.configuration = configuration;
+    }
+
     @Override
     public <C, S, E> Processor<C, S, E> mint(final Compilation<C, S, E> 
compilation) {
-        return new RxJava<>(compilation);
+        return this.configuration.containsKey(RXJAVA_THREADS) ?
+                new ParallelRxJava<>(compilation, (int) 
this.configuration.get(RXJAVA_THREADS)) :
+                new SerialRxJava<>(compilation);
     }
 
     @Override
diff --git 
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RxJava.java
 
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
similarity index 50%
rename from 
java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RxJava.java
rename to 
java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
index 214004f..40fc11e 100644
--- 
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RxJava.java
+++ 
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
@@ -19,6 +19,7 @@
 package org.apache.tinkerpop.machine.processor.rxjava;
 
 import io.reactivex.Flowable;
+import io.reactivex.processors.PublishProcessor;
 import org.apache.tinkerpop.machine.bytecode.compiler.Compilation;
 import org.apache.tinkerpop.machine.function.BarrierFunction;
 import org.apache.tinkerpop.machine.function.BranchFunction;
@@ -29,72 +30,32 @@ import 
org.apache.tinkerpop.machine.function.InitialFunction;
 import org.apache.tinkerpop.machine.function.MapFunction;
 import org.apache.tinkerpop.machine.function.ReduceFunction;
 import org.apache.tinkerpop.machine.function.branch.RepeatBranch;
-import org.apache.tinkerpop.machine.processor.Processor;
 import org.apache.tinkerpop.machine.traverser.Traverser;
 import org.apache.tinkerpop.machine.traverser.TraverserFactory;
-import org.apache.tinkerpop.machine.traverser.TraverserSet;
 import org.apache.tinkerpop.machine.util.IteratorUtils;
 import org.reactivestreams.Publisher;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class RxJava<C, S, E> implements Processor<C, S, E> {
+public final class SerialRxJava<C, S, E> extends AbstractRxJava<C, S, E> {
 
-    private static final int MAX_REPETITIONS = 8; // TODO: this needs to be a 
dynamic configuration
-
-    private final AtomicBoolean alive = new AtomicBoolean(Boolean.TRUE);
-    private boolean executed = false;
-    private final TraverserSet<C, S> starts = new TraverserSet<>();
-    private final TraverserSet<C, E> ends = new TraverserSet<>();
-    private final Compilation<C, S, E> compilation;
-
-    public RxJava(final Compilation<C, S, E> compilation) {
-        this.compilation = compilation;
-    }
-
-    @Override
-    public void addStart(final Traverser<C, S> traverser) {
-        this.starts.add(traverser);
-    }
-
-    @Override
-    public Traverser<C, E> next() {
-        this.prepareFlow();
-        return this.ends.remove();
-    }
-
-    @Override
-    public boolean hasNext() {
-        this.prepareFlow();
-        return !this.ends.isEmpty();
+    public SerialRxJava(final Compilation<C, S, E> compilation) {
+        super(compilation);
     }
 
     @Override
-    public void reset() {
-        this.starts.clear();
-        this.ends.clear();
-        this.executed = false;
-    }
-
-    private void prepareFlow() {
+    protected void prepareFlow() {
         if (!this.executed) {
             this.executed = true;
-            RxJava.compile(Flowable.fromIterable(this.starts), 
this.compilation).
+            SerialRxJava.compile(Flowable.fromIterable(this.starts), 
this.compilation).
                     doOnNext(this.ends::add).
                     doOnComplete(() -> this.alive.set(Boolean.FALSE)).
-                    subscribe();
-        }
-        if (!this.ends.isEmpty())
-            return;
-        while (this.alive.get()) {
-            if (!this.ends.isEmpty())
-                return;
+                    blockingSubscribe();
         }
     }
 
@@ -104,7 +65,7 @@ public final class RxJava<C, S, E> implements Processor<C, 
S, E> {
         final TraverserFactory<C> traverserFactory = 
compilation.getTraverserFactory();
         Flowable<Traverser<C, E>> sink = (Flowable) source;
         for (final CFunction<C> function : compilation.getFunctions()) {
-            sink = RxJava.extend(sink, function, traverserFactory);
+            sink = SerialRxJava.extend(sink, function, traverserFactory);
         }
         return sink;
     }
@@ -138,72 +99,24 @@ public final class RxJava<C, S, E> implements Processor<C, 
S, E> {
                             branch));
                 }
             }
-            Flowable<Traverser<C, E>> sink = (Flowable) flow.filter(t -> 
false); // branches are the only outputs
-            for (final Publisher<Traverser<C, E>> branchFlow : branchFlows) {
-                sink = sink.mergeWith(branchFlow);
-            }
-            return sink;
+            return PublishProcessor.merge(branchFlows);
         } else if (function instanceof RepeatBranch) {
             final RepeatBranch<C, S> repeatBranch = (RepeatBranch<C, S>) 
function;
             final List<Publisher<Traverser<C, S>>> outputs = new ArrayList<>();
+            Flowable<List> selectorFlow;
             for (int i = 0; i < MAX_REPETITIONS; i++) {
-                Flowable<List> selectorFlow = flow.flatMapIterable(t -> {
-                    final List<List> list = new ArrayList<>();
-                    if (repeatBranch.hasStartPredicates()) {
-                        if (1 == repeatBranch.getUntilLocation()) {
-                            if (repeatBranch.getUntil().filterTraverser(t)) {
-                                list.add(List.of(0, 
t.repeatDone(repeatBranch)));
-                            } else if (2 == repeatBranch.getEmitLocation() && 
repeatBranch.getEmit().filterTraverser(t)) {
-                                list.add(List.of(1, t));
-                                list.add(List.of(0, 
t.repeatDone(repeatBranch)));
-                            } else
-                                list.add(List.of(1, t));
-                        } else if (1 == repeatBranch.getEmitLocation()) {
-                            if (repeatBranch.getEmit().filterTraverser(t))
-                                list.add(List.of(0, 
t.repeatDone(repeatBranch)));
-                            if (2 == repeatBranch.getUntilLocation() && 
repeatBranch.getUntil().filterTraverser(t)) {
-                                list.add(List.of(0, 
t.repeatDone(repeatBranch)));
-                            } else
-                                list.add(List.of(1, t));
-                        }
-                    } else
-                        list.add(List.of(1, t));
-                    return list;
-                });
-                outputs.add(selectorFlow.filter(list -> 
list.get(0).equals(0)).map(list -> (Traverser<C, S>) list.get(1)));
-                flow = compile(selectorFlow.filter(list -> 
list.get(0).equals(1)).map(list -> (Traverser<C, S>) list.get(1)), 
repeatBranch.getRepeat());
-                selectorFlow = flow.flatMapIterable(t -> {
-                    t = t.repeatLoop(repeatBranch);
-                    final List<List> list = new ArrayList<>();
-                    if (repeatBranch.hasEndPredicates()) {
-                        if (3 == repeatBranch.getUntilLocation()) {
-                            if (repeatBranch.getUntil().filterTraverser(t)) {
-                                list.add(List.of(0, 
t.repeatDone(repeatBranch)));
-                            } else if (4 == repeatBranch.getEmitLocation() && 
repeatBranch.getEmit().filterTraverser(t)) {
-                                list.add(List.of(0, 
t.repeatDone(repeatBranch)));
-                                list.add(List.of(1, t));
-                            } else
-                                list.add(List.of(1, t));
-                        } else if (3 == repeatBranch.getEmitLocation()) {
-                            if (repeatBranch.getEmit().filterTraverser(t))
-                                list.add(List.of(0, 
t.repeatDone(repeatBranch)));
-                            if (4 == repeatBranch.getUntilLocation() && 
repeatBranch.getUntil().filterTraverser(t))
-                                list.add(List.of(0, 
t.repeatDone(repeatBranch)));
-                            else
-                                list.add(List.of(1, t));
-                        }
-                    } else
-                        list.add(List.of(1, t));
-                    return list;
-                });
+                if (repeatBranch.hasStartPredicates()) {
+                    selectorFlow = flow.flatMapIterable(new 
RepeatStart<>(repeatBranch));
+                    outputs.add(selectorFlow.filter(list -> 
list.get(0).equals(0)).map(list -> (Traverser<C, S>) list.get(1)));
+                    flow = compile(selectorFlow.filter(list -> 
list.get(0).equals(1)).map(list -> (Traverser<C, S>) list.get(1)), 
repeatBranch.getRepeat());
+                } else {
+                    flow = compile(flow, repeatBranch.getRepeat());
+                }
+                selectorFlow = flow.flatMapIterable(new 
RepeatEnd<>(repeatBranch));
                 outputs.add(selectorFlow.filter(list -> 
list.get(0).equals(0)).map(list -> (Traverser<C, S>) list.get(1)));
                 flow = selectorFlow.filter(list -> 
list.get(0).equals(1)).map(list -> (Traverser<C, S>) list.get(1));
             }
-            Flowable<Traverser<C, S>> sink = flow.filter(t -> false); // 
branches are the only outputs
-            for (final Publisher<Traverser<C, S>> output : outputs) {
-                sink = sink.mergeWith(output);
-            }
-            return (Flowable) sink;
+            return (Flowable) PublishProcessor.merge(outputs);
         }
         throw new RuntimeException("Need a new execution plan step: " + 
function);
     }
diff --git 
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/strategy/RxJavaStrategy.java
 
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/strategy/RxJavaStrategy.java
index 85d3290..ed33a98 100644
--- 
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/strategy/RxJavaStrategy.java
+++ 
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/strategy/RxJavaStrategy.java
@@ -25,6 +25,8 @@ import 
org.apache.tinkerpop.machine.processor.rxjava.RxJavaProcessor;
 import org.apache.tinkerpop.machine.strategy.AbstractStrategy;
 import org.apache.tinkerpop.machine.strategy.Strategy;
 
+import java.util.Map;
+
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
@@ -32,7 +34,7 @@ public class RxJavaStrategy extends 
AbstractStrategy<Strategy.ProviderStrategy>
     @Override
     public <C> void apply(final Bytecode<C> bytecode) {
         if (!BytecodeUtil.hasSourceInstruction(bytecode, 
CoreCompiler.Symbols.WITH_PROCESSOR)) {
-            bytecode.addSourceInstruction(CoreCompiler.Symbols.WITH_PROCESSOR, 
RxJavaProcessor.class);
+            bytecode.addSourceInstruction(CoreCompiler.Symbols.WITH_PROCESSOR, 
RxJavaProcessor.class, Map.of(RxJavaProcessor.RXJAVA_THREADS, 10)); // TODO: 
need root in strategies
         }
     }
 }
diff --git 
a/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/SimpleLocalTest.java
 
b/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/SimpleLocalParallelTest.java
similarity index 87%
copy from 
java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/SimpleLocalTest.java
copy to 
java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/SimpleLocalParallelTest.java
index 246d486..59d73ec 100644
--- 
a/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/SimpleLocalTest.java
+++ 
b/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/SimpleLocalParallelTest.java
@@ -23,18 +23,20 @@ import org.apache.tinkerpop.machine.bytecode.Bytecode;
 import org.apache.tinkerpop.machine.bytecode.compiler.CoreCompiler;
 import org.apache.tinkerpop.machine.species.LocalMachine;
 
+import java.util.Map;
+
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-class SimpleLocalTest extends SimpleTestSuite {
+public class SimpleLocalParallelTest extends SimpleTestSuite {
 
     private final static Bytecode<Long> BYTECODE = new Bytecode<>();
 
     static {
-        BYTECODE.addSourceInstruction(CoreCompiler.Symbols.WITH_PROCESSOR, 
RxJavaProcessor.class);
+        BYTECODE.addSourceInstruction(CoreCompiler.Symbols.WITH_PROCESSOR, 
RxJavaProcessor.class, Map.of(RxJavaProcessor.RXJAVA_THREADS, 10));
     }
 
-    SimpleLocalTest() {
+    SimpleLocalParallelTest() {
         super(LocalMachine.open(), BYTECODE);
     }
 
diff --git 
a/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/SimpleLocalTest.java
 
b/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/SimpleLocalSerialTest.java
similarity index 94%
rename from 
java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/SimpleLocalTest.java
rename to 
java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/SimpleLocalSerialTest.java
index 246d486..5afa1e7 100644
--- 
a/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/SimpleLocalTest.java
+++ 
b/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/SimpleLocalSerialTest.java
@@ -26,7 +26,7 @@ import org.apache.tinkerpop.machine.species.LocalMachine;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-class SimpleLocalTest extends SimpleTestSuite {
+class SimpleLocalSerialTest extends SimpleTestSuite {
 
     private final static Bytecode<Long> BYTECODE = new Bytecode<>();
 
@@ -34,7 +34,7 @@ class SimpleLocalTest extends SimpleTestSuite {
         BYTECODE.addSourceInstruction(CoreCompiler.Symbols.WITH_PROCESSOR, 
RxJavaProcessor.class);
     }
 
-    SimpleLocalTest() {
+    SimpleLocalSerialTest() {
         super(LocalMachine.open(), BYTECODE);
     }
 
diff --git 
a/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/SimpleRemoteTest.java
 
b/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/SimpleRemoteSerialTest.java
similarity index 94%
rename from 
java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/SimpleRemoteTest.java
rename to 
java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/SimpleRemoteSerialTest.java
index 6aa9c68..a8d1e83 100644
--- 
a/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/SimpleRemoteTest.java
+++ 
b/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/SimpleRemoteSerialTest.java
@@ -28,7 +28,7 @@ import org.junit.jupiter.api.AfterAll;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class SimpleRemoteTest extends SimpleTestSuite {
+public class SimpleRemoteSerialTest extends SimpleTestSuite {
 
     private final static Bytecode<Long> BYTECODE = new Bytecode<>();
     private static MachineServer SERVER = new MachineServer(7777);
@@ -37,7 +37,7 @@ public class SimpleRemoteTest extends SimpleTestSuite {
         BYTECODE.addSourceInstruction(CoreCompiler.Symbols.WITH_PROCESSOR, 
RxJavaProcessor.class);
     }
 
-    SimpleRemoteTest() {
+    SimpleRemoteSerialTest() {
         super(RemoteMachine.open(6666, "localhost", 7777), BYTECODE);
     }
 

Reply via email to