This is an automated email from the ASF dual-hosted git repository.

okram pushed a commit to branch tp4
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git


The following commit(s) were added to refs/heads/tp4 by this push:
     new 5784db0  added RepeatStartFn and RepeatEndFn to Beam to wrap the 
repeat-loop. This is necessary to handle the case when until() and emit() are 
split on opposite sides of repeat().
5784db0 is described below

commit 5784db039cc6ce882529de65165fc9e8ae54f84d
Author: Marko A. Rodriguez <okramma...@gmail.com>
AuthorDate: Sun Mar 17 06:45:06 2019 -0600

    added RepeatStartFn and RepeatEndFn to Beam to wrap the repeat-loop. This 
is necessary to handle the case when until() and emit() are split on opposite 
sides of repeat().
---
 .../beam/{RepeatFn.java => RepeatEndFn.java}       | 40 ++++------------------
 .../beam/{RepeatFn.java => RepeatStartFn.java}     | 38 ++++----------------
 .../tinkerpop/machine/beam/util/TopologyUtil.java  | 15 +++++---
 .../apache/tinkerpop/machine/beam/BeamTest.java    |  5 +++
 .../apache/tinkerpop/machine/pipes/PipesTest.java  |  5 +++
 5 files changed, 34 insertions(+), 69 deletions(-)

diff --git 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/RepeatFn.java
 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/RepeatEndFn.java
similarity index 69%
copy from 
java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/RepeatFn.java
copy to 
java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/RepeatEndFn.java
index 90fdb3b..56e34d8 100644
--- 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/RepeatFn.java
+++ 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/RepeatEndFn.java
@@ -27,7 +27,7 @@ import org.apache.tinkerpop.machine.traverser.Traverser;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class RepeatFn<C, S> extends AbstractFn<C, S, S> {
+public class RepeatEndFn<C, S> extends AbstractFn<C, S, S> {
 
     private final int untilLocation;
     private final int emitLocation;
@@ -36,13 +36,12 @@ public class RepeatFn<C, S> extends AbstractFn<C, S, S> {
     private final TupleTag<Traverser<C, S>> repeatDone;
     private final TupleTag<Traverser<C, S>> repeatLoop;
     private final boolean deadEnd;
-    private final boolean first;
 
 
-    public RepeatFn(final RepeatBranch<C, S> repeatBranch,
-                    final TupleTag<Traverser<C, S>> repeatDone,
-                    final TupleTag<Traverser<C, S>> repeatLoop,
-                    final boolean deadEnd, final boolean first) {
+    public RepeatEndFn(final RepeatBranch<C, S> repeatBranch,
+                       final TupleTag<Traverser<C, S>> repeatDone,
+                       final TupleTag<Traverser<C, S>> repeatLoop,
+                       final boolean deadEnd) {
         super(repeatBranch);
         this.untilLocation = repeatBranch.getUntilLocation();
         this.untilCompilation = repeatBranch.getUntil();
@@ -51,43 +50,19 @@ public class RepeatFn<C, S> extends AbstractFn<C, S, S> {
         this.repeatDone = repeatDone;
         this.repeatLoop = repeatLoop;
         this.deadEnd = deadEnd;
-        this.first = first;
     }
 
     @ProcessElement
     public void processElement(final @DoFn.Element Traverser<C, S> traverser, 
final MultiOutputReceiver out) {
-        if (1 == this.untilLocation) {
-            if (this.untilCompilation.filterTraverser(traverser.clone())) {
-                out.get(this.repeatDone).output(traverser.clone());
-            } else if (2 == this.emitLocation && 
this.emitCompilation.filterTraverser(traverser.clone())) {
-                out.get(this.repeatDone).output(traverser.clone());
-                out.get(this.repeatLoop).output(traverser.clone());
-            } else {
-                out.get(this.repeatLoop).output(traverser.clone());
-            }
-            return;
-        } else if (1 == this.emitLocation) {
-            if (this.emitCompilation.filterTraverser(traverser.clone()))
-                out.get(this.repeatDone).output(traverser.clone());
-            if (2 == this.untilLocation && 
this.untilCompilation.filterTraverser(traverser.clone()))
-                out.get(this.repeatDone).output(traverser.clone());
-            else
-                out.get(this.repeatLoop).output(traverser.clone());
-            return;
-        } else if (this.first) {
-            out.get(this.repeatLoop).output(traverser.clone());
-            return;
-        }
-
-
         if (3 == this.untilLocation) {
             if (this.untilCompilation.filterTraverser(traverser.clone())) {
                 out.get(this.repeatDone).output(traverser.clone());
             } else if (4 == this.emitLocation && 
this.emitCompilation.filterTraverser(traverser.clone())) {
                 out.get(this.repeatDone).output(traverser.clone());
                 out.get(this.repeatLoop).output(traverser.clone());
-            } else
+            } else {
                 out.get(this.repeatLoop).output(traverser.clone());
+            }
         } else if (3 == this.emitLocation) {
             if (this.emitCompilation.filterTraverser(traverser.clone()))
                 out.get(this.repeatDone).output(traverser.clone());
@@ -98,7 +73,6 @@ public class RepeatFn<C, S> extends AbstractFn<C, S, S> {
         } else {
             out.get(this.repeatLoop).output(traverser.clone());
         }
-
     }
 
     /*private void outputDone(final Traverser<C, S> traverser) {
diff --git 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/RepeatFn.java
 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/RepeatStartFn.java
similarity index 69%
rename from 
java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/RepeatFn.java
rename to 
java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/RepeatStartFn.java
index 90fdb3b..b11a078 100644
--- 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/RepeatFn.java
+++ 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/RepeatStartFn.java
@@ -27,7 +27,7 @@ import org.apache.tinkerpop.machine.traverser.Traverser;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class RepeatFn<C, S> extends AbstractFn<C, S, S> {
+public class RepeatStartFn<C, S> extends AbstractFn<C, S, S> {
 
     private final int untilLocation;
     private final int emitLocation;
@@ -36,13 +36,12 @@ public class RepeatFn<C, S> extends AbstractFn<C, S, S> {
     private final TupleTag<Traverser<C, S>> repeatDone;
     private final TupleTag<Traverser<C, S>> repeatLoop;
     private final boolean deadEnd;
-    private final boolean first;
 
 
-    public RepeatFn(final RepeatBranch<C, S> repeatBranch,
-                    final TupleTag<Traverser<C, S>> repeatDone,
-                    final TupleTag<Traverser<C, S>> repeatLoop,
-                    final boolean deadEnd, final boolean first) {
+    public RepeatStartFn(final RepeatBranch<C, S> repeatBranch,
+                         final TupleTag<Traverser<C, S>> repeatDone,
+                         final TupleTag<Traverser<C, S>> repeatLoop,
+                         final boolean deadEnd) {
         super(repeatBranch);
         this.untilLocation = repeatBranch.getUntilLocation();
         this.untilCompilation = repeatBranch.getUntil();
@@ -51,7 +50,6 @@ public class RepeatFn<C, S> extends AbstractFn<C, S, S> {
         this.repeatDone = repeatDone;
         this.repeatLoop = repeatLoop;
         this.deadEnd = deadEnd;
-        this.first = first;
     }
 
     @ProcessElement
@@ -65,7 +63,6 @@ public class RepeatFn<C, S> extends AbstractFn<C, S, S> {
             } else {
                 out.get(this.repeatLoop).output(traverser.clone());
             }
-            return;
         } else if (1 == this.emitLocation) {
             if (this.emitCompilation.filterTraverser(traverser.clone()))
                 out.get(this.repeatDone).output(traverser.clone());
@@ -73,32 +70,9 @@ public class RepeatFn<C, S> extends AbstractFn<C, S, S> {
                 out.get(this.repeatDone).output(traverser.clone());
             else
                 out.get(this.repeatLoop).output(traverser.clone());
-            return;
-        } else if (this.first) {
-            out.get(this.repeatLoop).output(traverser.clone());
-            return;
-        }
-
-
-        if (3 == this.untilLocation) {
-            if (this.untilCompilation.filterTraverser(traverser.clone())) {
-                out.get(this.repeatDone).output(traverser.clone());
-            } else if (4 == this.emitLocation && 
this.emitCompilation.filterTraverser(traverser.clone())) {
-                out.get(this.repeatDone).output(traverser.clone());
-                out.get(this.repeatLoop).output(traverser.clone());
-            } else
-                out.get(this.repeatLoop).output(traverser.clone());
-        } else if (3 == this.emitLocation) {
-            if (this.emitCompilation.filterTraverser(traverser.clone()))
-                out.get(this.repeatDone).output(traverser.clone());
-            if (4 == this.untilLocation && 
this.untilCompilation.filterTraverser(traverser.clone()))
-                out.get(this.repeatDone).output(traverser.clone());
-            else
-                out.get(this.repeatLoop).output(traverser.clone());
         } else {
             out.get(this.repeatLoop).output(traverser.clone());
         }
-
     }
 
     /*private void outputDone(final Traverser<C, S> traverser) {
@@ -108,4 +82,4 @@ public class RepeatFn<C, S> extends AbstractFn<C, S, S> {
             //
         }
     }*/
-}
+}
\ No newline at end of file
diff --git 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/util/TopologyUtil.java
 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/util/TopologyUtil.java
index 37c85a7..a0d3a01 100644
--- 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/util/TopologyUtil.java
+++ 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/util/TopologyUtil.java
@@ -33,7 +33,8 @@ import org.apache.tinkerpop.machine.beam.FlatMapFn;
 import org.apache.tinkerpop.machine.beam.InitialFn;
 import org.apache.tinkerpop.machine.beam.MapFn;
 import org.apache.tinkerpop.machine.beam.ReduceFn;
-import org.apache.tinkerpop.machine.beam.RepeatFn;
+import org.apache.tinkerpop.machine.beam.RepeatEndFn;
+import org.apache.tinkerpop.machine.beam.RepeatStartFn;
 import org.apache.tinkerpop.machine.beam.serialization.TraverserCoder;
 import org.apache.tinkerpop.machine.bytecode.Compilation;
 import org.apache.tinkerpop.machine.function.BranchFunction;
@@ -85,13 +86,19 @@ public class TopologyUtil {
             final TupleTag<Traverser<C, S>> repeatLoop = new TupleTag<>();
             sink = source;
             for (int i = 0; i < Beam.MAX_REPETIONS; i++) {
-                final RepeatFn<C, S> fn = new RepeatFn<>(repeatFunction, 
repeatDone, repeatLoop, i == Beam.MAX_REPETIONS - 1, i == 0);
-                final PCollectionTuple outputs = (PCollectionTuple) 
sink.apply(ParDo.of(fn).withOutputTags(repeatLoop, 
TupleTagList.of(repeatDone)));
+                final RepeatStartFn<C, S> startFn = new 
RepeatStartFn<>(repeatFunction, repeatDone, repeatLoop, i == Beam.MAX_REPETIONS 
- 1);
+                PCollectionTuple outputs = (PCollectionTuple) 
sink.apply(ParDo.of(startFn).withOutputTags(repeatLoop, 
TupleTagList.of(repeatDone)));
                 outputs.getAll().values().forEach(c -> c.setCoder(new 
TraverserCoder()));
                 repeatSinks.add(outputs.get(repeatDone));
+                sink = outputs.get(repeatLoop);
                 for (final CFunction<C> ff : 
repeatFunction.getRepeat().getFunctions()) {
-                    sink = TopologyUtil.extend(outputs.get(repeatLoop), ff, 
traverserFactory);
+                    sink = TopologyUtil.extend(sink, ff, traverserFactory);
                 }
+                final RepeatEndFn<C, S> endFn = new 
RepeatEndFn<>(repeatFunction, repeatDone, repeatLoop, i == Beam.MAX_REPETIONS - 
1);
+                outputs = (PCollectionTuple) 
sink.apply(ParDo.of(endFn).withOutputTags(repeatLoop, 
TupleTagList.of(repeatDone)));
+                outputs.getAll().values().forEach(c -> c.setCoder(new 
TraverserCoder()));
+                repeatSinks.add(outputs.get(repeatDone));
+                sink = outputs.get(repeatLoop);
             }
             sink = 
PCollectionList.of(repeatSinks).apply(Flatten.pCollections());
         } else if (function instanceof BranchFunction) {
diff --git 
a/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java
 
b/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java
index dcfe33f..853cd6e 100644
--- 
a/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java
+++ 
b/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java
@@ -57,6 +57,11 @@ public class BeamTest {
         System.out.println(traversal);
         System.out.println(traversal.toList());
         System.out.println("\n----------\n");
+        traversal = 
g.inject(1L).until(__.is(11L)).repeat(__.<Long>incr().incr()).emit(__.constant(true));
+        System.out.println(TraversalUtil.getBytecode(traversal));
+        System.out.println(traversal);
+        System.out.println(traversal.toList());
+        System.out.println("\n----------\n");
         traversal = g.inject(1L, 2L, 
3L).repeat(__.<Long>incr().incr().incr()).until(is(10L));
         
System.out.println(TraversalUtil.getBytecode(traversal).getSourceInstructions());
         System.out.println(TraversalUtil.getBytecode(traversal));
diff --git 
a/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java
 
b/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java
index 4f14c40..00b3b32 100644
--- 
a/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java
+++ 
b/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java
@@ -58,6 +58,11 @@ public class PipesTest {
         System.out.println(traversal);
         System.out.println(traversal.toList());
         System.out.println("\n----------\n");
+        traversal = 
g.inject(1L).until(__.is(5L)).repeat(incr()).emit(__.constant(true));
+        System.out.println(TraversalUtil.getBytecode(traversal));
+        System.out.println(traversal);
+        System.out.println(traversal.toList());
+        System.out.println("\n----------\n");
         traversal = g.inject(7L).union(__.incr(), 
__.<Long>incr().incr().union(__.incr(), __.incr()));
         System.out.println(TraversalUtil.getBytecode(traversal));
         System.out.println(traversal);

Reply via email to