This is an automated email from the ASF dual-hosted git repository. okram pushed a commit to branch tp4 in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
The following commit(s) were added to refs/heads/tp4 by this push: new 5784db0 added RepeatStartFn and RepeatEndFn to Beam to wrap the repeat-loop. This is necessary to handle the case when until() and emit() are split on opposite sides of repeat(). 5784db0 is described below commit 5784db039cc6ce882529de65165fc9e8ae54f84d Author: Marko A. Rodriguez <okramma...@gmail.com> AuthorDate: Sun Mar 17 06:45:06 2019 -0600 added RepeatStartFn and RepeatEndFn to Beam to wrap the repeat-loop. This is necessary to handle the case when until() and emit() are split on opposite sides of repeat(). --- .../beam/{RepeatFn.java => RepeatEndFn.java} | 40 ++++------------------ .../beam/{RepeatFn.java => RepeatStartFn.java} | 38 ++++---------------- .../tinkerpop/machine/beam/util/TopologyUtil.java | 15 +++++--- .../apache/tinkerpop/machine/beam/BeamTest.java | 5 +++ .../apache/tinkerpop/machine/pipes/PipesTest.java | 5 +++ 5 files changed, 34 insertions(+), 69 deletions(-) diff --git a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/RepeatFn.java b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/RepeatEndFn.java similarity index 69% copy from java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/RepeatFn.java copy to java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/RepeatEndFn.java index 90fdb3b..56e34d8 100644 --- a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/RepeatFn.java +++ b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/RepeatEndFn.java @@ -27,7 +27,7 @@ import org.apache.tinkerpop.machine.traverser.Traverser; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public class RepeatFn<C, S> extends AbstractFn<C, S, S> { +public class RepeatEndFn<C, S> extends AbstractFn<C, S, S> { private final int untilLocation; private final int emitLocation; @@ -36,13 +36,12 @@ public class RepeatFn<C, S> extends AbstractFn<C, S, S> { private final TupleTag<Traverser<C, S>> repeatDone; private final TupleTag<Traverser<C, S>> repeatLoop; private final boolean deadEnd; - private final boolean first; - public RepeatFn(final RepeatBranch<C, S> repeatBranch, - final TupleTag<Traverser<C, S>> repeatDone, - final TupleTag<Traverser<C, S>> repeatLoop, - final boolean deadEnd, final boolean first) { + public RepeatEndFn(final RepeatBranch<C, S> repeatBranch, + final TupleTag<Traverser<C, S>> repeatDone, + final TupleTag<Traverser<C, S>> repeatLoop, + final boolean deadEnd) { super(repeatBranch); this.untilLocation = repeatBranch.getUntilLocation(); this.untilCompilation = repeatBranch.getUntil(); @@ -51,43 +50,19 @@ public class RepeatFn<C, S> extends AbstractFn<C, S, S> { this.repeatDone = repeatDone; this.repeatLoop = repeatLoop; this.deadEnd = deadEnd; - this.first = first; } @ProcessElement public void processElement(final @DoFn.Element Traverser<C, S> traverser, final MultiOutputReceiver out) { - if (1 == this.untilLocation) { - if (this.untilCompilation.filterTraverser(traverser.clone())) { - out.get(this.repeatDone).output(traverser.clone()); - } else if (2 == this.emitLocation && this.emitCompilation.filterTraverser(traverser.clone())) { - out.get(this.repeatDone).output(traverser.clone()); - out.get(this.repeatLoop).output(traverser.clone()); - } else { - out.get(this.repeatLoop).output(traverser.clone()); - } - return; - } else if (1 == this.emitLocation) { - if (this.emitCompilation.filterTraverser(traverser.clone())) - out.get(this.repeatDone).output(traverser.clone()); - if (2 == this.untilLocation && this.untilCompilation.filterTraverser(traverser.clone())) - out.get(this.repeatDone).output(traverser.clone()); - else - out.get(this.repeatLoop).output(traverser.clone()); - return; - } else if (this.first) { - out.get(this.repeatLoop).output(traverser.clone()); - return; - } - - if (3 == this.untilLocation) { if (this.untilCompilation.filterTraverser(traverser.clone())) { out.get(this.repeatDone).output(traverser.clone()); } else if (4 == this.emitLocation && this.emitCompilation.filterTraverser(traverser.clone())) { out.get(this.repeatDone).output(traverser.clone()); out.get(this.repeatLoop).output(traverser.clone()); - } else + } else { out.get(this.repeatLoop).output(traverser.clone()); + } } else if (3 == this.emitLocation) { if (this.emitCompilation.filterTraverser(traverser.clone())) out.get(this.repeatDone).output(traverser.clone()); @@ -98,7 +73,6 @@ public class RepeatFn<C, S> extends AbstractFn<C, S, S> { } else { out.get(this.repeatLoop).output(traverser.clone()); } - } /*private void outputDone(final Traverser<C, S> traverser) { diff --git a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/RepeatFn.java b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/RepeatStartFn.java similarity index 69% rename from java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/RepeatFn.java rename to java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/RepeatStartFn.java index 90fdb3b..b11a078 100644 --- a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/RepeatFn.java +++ b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/RepeatStartFn.java @@ -27,7 +27,7 @@ import org.apache.tinkerpop.machine.traverser.Traverser; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public class RepeatFn<C, S> extends AbstractFn<C, S, S> { +public class RepeatStartFn<C, S> extends AbstractFn<C, S, S> { private final int untilLocation; private final int emitLocation; @@ -36,13 +36,12 @@ public class RepeatFn<C, S> extends AbstractFn<C, S, S> { private final TupleTag<Traverser<C, S>> repeatDone; private final TupleTag<Traverser<C, S>> repeatLoop; private final boolean deadEnd; - private final boolean first; - public RepeatFn(final RepeatBranch<C, S> repeatBranch, - final TupleTag<Traverser<C, S>> repeatDone, - final TupleTag<Traverser<C, S>> repeatLoop, - final boolean deadEnd, final boolean first) { + public RepeatStartFn(final RepeatBranch<C, S> repeatBranch, + final TupleTag<Traverser<C, S>> repeatDone, + final TupleTag<Traverser<C, S>> repeatLoop, + final boolean deadEnd) { super(repeatBranch); this.untilLocation = repeatBranch.getUntilLocation(); this.untilCompilation = repeatBranch.getUntil(); @@ -51,7 +50,6 @@ public class RepeatFn<C, S> extends AbstractFn<C, S, S> { this.repeatDone = repeatDone; this.repeatLoop = repeatLoop; this.deadEnd = deadEnd; - this.first = first; } @ProcessElement @@ -65,7 +63,6 @@ public class RepeatFn<C, S> extends AbstractFn<C, S, S> { } else { out.get(this.repeatLoop).output(traverser.clone()); } - return; } else if (1 == this.emitLocation) { if (this.emitCompilation.filterTraverser(traverser.clone())) out.get(this.repeatDone).output(traverser.clone()); @@ -73,32 +70,9 @@ public class RepeatFn<C, S> extends AbstractFn<C, S, S> { out.get(this.repeatDone).output(traverser.clone()); else out.get(this.repeatLoop).output(traverser.clone()); - return; - } else if (this.first) { - out.get(this.repeatLoop).output(traverser.clone()); - return; - } - - - if (3 == this.untilLocation) { - if (this.untilCompilation.filterTraverser(traverser.clone())) { - out.get(this.repeatDone).output(traverser.clone()); - } else if (4 == this.emitLocation && this.emitCompilation.filterTraverser(traverser.clone())) { - out.get(this.repeatDone).output(traverser.clone()); - out.get(this.repeatLoop).output(traverser.clone()); - } else - out.get(this.repeatLoop).output(traverser.clone()); - } else if (3 == this.emitLocation) { - if (this.emitCompilation.filterTraverser(traverser.clone())) - out.get(this.repeatDone).output(traverser.clone()); - if (4 == this.untilLocation && this.untilCompilation.filterTraverser(traverser.clone())) - out.get(this.repeatDone).output(traverser.clone()); - else - out.get(this.repeatLoop).output(traverser.clone()); } else { out.get(this.repeatLoop).output(traverser.clone()); } - } /*private void outputDone(final Traverser<C, S> traverser) { @@ -108,4 +82,4 @@ public class RepeatFn<C, S> extends AbstractFn<C, S, S> { // } }*/ -} +} \ No newline at end of file diff --git a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/util/TopologyUtil.java b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/util/TopologyUtil.java index 37c85a7..a0d3a01 100644 --- a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/util/TopologyUtil.java +++ b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/util/TopologyUtil.java @@ -33,7 +33,8 @@ import org.apache.tinkerpop.machine.beam.FlatMapFn; import org.apache.tinkerpop.machine.beam.InitialFn; import org.apache.tinkerpop.machine.beam.MapFn; import org.apache.tinkerpop.machine.beam.ReduceFn; -import org.apache.tinkerpop.machine.beam.RepeatFn; +import org.apache.tinkerpop.machine.beam.RepeatEndFn; +import org.apache.tinkerpop.machine.beam.RepeatStartFn; import org.apache.tinkerpop.machine.beam.serialization.TraverserCoder; import org.apache.tinkerpop.machine.bytecode.Compilation; import org.apache.tinkerpop.machine.function.BranchFunction; @@ -85,13 +86,19 @@ public class TopologyUtil { final TupleTag<Traverser<C, S>> repeatLoop = new TupleTag<>(); sink = source; for (int i = 0; i < Beam.MAX_REPETIONS; i++) { - final RepeatFn<C, S> fn = new RepeatFn<>(repeatFunction, repeatDone, repeatLoop, i == Beam.MAX_REPETIONS - 1, i == 0); - final PCollectionTuple outputs = (PCollectionTuple) sink.apply(ParDo.of(fn).withOutputTags(repeatLoop, TupleTagList.of(repeatDone))); + final RepeatStartFn<C, S> startFn = new RepeatStartFn<>(repeatFunction, repeatDone, repeatLoop, i == Beam.MAX_REPETIONS - 1); + PCollectionTuple outputs = (PCollectionTuple) sink.apply(ParDo.of(startFn).withOutputTags(repeatLoop, TupleTagList.of(repeatDone))); outputs.getAll().values().forEach(c -> c.setCoder(new TraverserCoder())); repeatSinks.add(outputs.get(repeatDone)); + sink = outputs.get(repeatLoop); for (final CFunction<C> ff : repeatFunction.getRepeat().getFunctions()) { - sink = TopologyUtil.extend(outputs.get(repeatLoop), ff, traverserFactory); + sink = TopologyUtil.extend(sink, ff, traverserFactory); } + final RepeatEndFn<C, S> endFn = new RepeatEndFn<>(repeatFunction, repeatDone, repeatLoop, i == Beam.MAX_REPETIONS - 1); + outputs = (PCollectionTuple) sink.apply(ParDo.of(endFn).withOutputTags(repeatLoop, TupleTagList.of(repeatDone))); + outputs.getAll().values().forEach(c -> c.setCoder(new TraverserCoder())); + repeatSinks.add(outputs.get(repeatDone)); + sink = outputs.get(repeatLoop); } sink = PCollectionList.of(repeatSinks).apply(Flatten.pCollections()); } else if (function instanceof BranchFunction) { diff --git a/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java b/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java index dcfe33f..853cd6e 100644 --- a/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java +++ b/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java @@ -57,6 +57,11 @@ public class BeamTest { System.out.println(traversal); System.out.println(traversal.toList()); System.out.println("\n----------\n"); + traversal = g.inject(1L).until(__.is(11L)).repeat(__.<Long>incr().incr()).emit(__.constant(true)); + System.out.println(TraversalUtil.getBytecode(traversal)); + System.out.println(traversal); + System.out.println(traversal.toList()); + System.out.println("\n----------\n"); traversal = g.inject(1L, 2L, 3L).repeat(__.<Long>incr().incr().incr()).until(is(10L)); System.out.println(TraversalUtil.getBytecode(traversal).getSourceInstructions()); System.out.println(TraversalUtil.getBytecode(traversal)); diff --git a/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java b/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java index 4f14c40..00b3b32 100644 --- a/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java +++ b/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java @@ -58,6 +58,11 @@ public class PipesTest { System.out.println(traversal); System.out.println(traversal.toList()); System.out.println("\n----------\n"); + traversal = g.inject(1L).until(__.is(5L)).repeat(incr()).emit(__.constant(true)); + System.out.println(TraversalUtil.getBytecode(traversal)); + System.out.println(traversal); + System.out.println(traversal.toList()); + System.out.println("\n----------\n"); traversal = g.inject(7L).union(__.incr(), __.<Long>incr().incr().union(__.incr(), __.incr())); System.out.println(TraversalUtil.getBytecode(traversal)); System.out.println(traversal);