This is an automated email from the ASF dual-hosted git repository.

okram pushed a commit to branch tp4
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git


The following commit(s) were added to refs/heads/tp4 by this push:
     new 11ceb5b  Figured out how to compile a Flow once and reuse it over and 
over again. This is great for nested traversals where a single traverser is 
inserted and result is returned and this happens over and over again for each 
incoming traverser. By 'caching' the Flow, we save on compilation costs.
11ceb5b is described below

commit 11ceb5baa725384893aa05d5c71bbaadc9ee604d
Author: Marko A. Rodriguez <okramma...@gmail.com>
AuthorDate: Thu Apr 11 08:09:16 2019 -0600

    Figured out how to compile a Flow once and reuse it over and over again. 
This is great for nested traversals where a single traverser is inserted and 
result is returned and this happens over and over again for each incoming 
traverser. By 'caching' the Flow, we save on compilation costs.
---
 .../apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java   | 3 ++-
 .../apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java   | 5 ++++-
 .../org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java | 6 +++++-
 3 files changed, 11 insertions(+), 3 deletions(-)

diff --git 
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java
 
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java
index c9b35c3..6c478f4 100644
--- 
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java
+++ 
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java
@@ -60,9 +60,10 @@ public abstract class AbstractRxJava<C, S, E> implements 
Processor<C, S, E> {
 
     @Override
     public void reset() {
+        if (null != this.disposable)
+            this.disposable.dispose();
         this.starts.clear();
         this.ends.clear();
-        this.disposable = null;
         this.executed = false;
     }
 
diff --git 
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
 
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
index d278cbd..c1cda6f 100644
--- 
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
+++ 
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
@@ -50,6 +50,7 @@ public final class ParallelRxJava<C, S, E> extends 
AbstractRxJava<C, S, E> {
 
     private final ExecutorService threadPool;
     private final String bytecodeId;
+    private final ParallelFlowable<Traverser<C, E>> flowable;
 
     ParallelRxJava(final Compilation<C, S, E> compilation, final 
ExecutorService threadPool) {
         super(compilation);
@@ -57,13 +58,15 @@ public final class ParallelRxJava<C, S, E> extends 
AbstractRxJava<C, S, E> {
         this.bytecodeId = compilation.getBytecode().getParent().isEmpty() ?
                 (String) 
BytecodeUtil.getSourceInstructions(compilation.getBytecode(), 
RxJavaProcessor.RX_ROOT_BYTECODE_ID).get(0).args()[0] :
                 null;
+        // compile once and use many times
+        this.flowable = 
this.compile(ParallelFlowable.from(Flowable.fromIterable(this.starts)).runOn(Schedulers.from(this.threadPool)),
 this.compilation);
     }
 
     @Override
     protected void prepareFlow() {
         if (!this.executed) {
             this.executed = true;
-            this.disposable = 
this.compile(ParallelFlowable.from(Flowable.fromIterable(this.starts)).runOn(Schedulers.from(this.threadPool)),
 this.compilation).
+            this.disposable = this.flowable.
                     doOnNext(this.ends::add).
                     sequential().
                     doFinally(() -> {
diff --git 
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
 
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
index e68b2ad..5da1091 100644
--- 
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
+++ 
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
@@ -45,15 +45,19 @@ import java.util.Map;
  */
 public final class SerialRxJava<C, S, E> extends AbstractRxJava<C, S, E> {
 
+    private final Flowable<Traverser<C, E>> flowable;
+
     SerialRxJava(final Compilation<C, S, E> compilation) {
         super(compilation);
+        // compile once and reuse many times
+        this.flowable = 
SerialRxJava.compile(Flowable.fromIterable(this.starts), this.compilation);
     }
 
     @Override
     protected void prepareFlow() {
         if (!this.executed) {
             this.executed = true;
-            this.disposable = 
SerialRxJava.compile(Flowable.fromIterable(this.starts), this.compilation).
+            this.disposable = this.flowable.
                     doOnNext(this.ends::add).
                     subscribeOn(Schedulers.newThread()).subscribe(); // don't 
block the execution so results can be streamed back in real-time
         }

Reply via email to