This is an automated email from the ASF dual-hosted git repository.

okram pushed a commit to branch tp4
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit bfa48cbe9cdcaa53faa2925128039e0917fca53d
Author: Marko A. Rodriguez <okramma...@gmail.com>
AuthorDate: Wed Mar 13 11:35:06 2019 -0600

    BranchFuncions are no longer Java functions. Given that the implementation 
of branching is heavily dependent on processing engine semantics, 
BranchFunctions simply expose a branch selector and a Map of selector/traversal 
pairs. It is up to the processor provider to use that information to construct 
a branching topology. For Pipes, its pull-based. For Beam, its push-based.
---
 .../org/apache/tinkerpop/language/Symbols.java     |  5 ++
 .../org/apache/tinkerpop/language/Traversal.java   |  5 ++
 .../tinkerpop/machine/bytecode/BytecodeUtil.java   |  5 ++
 .../machine/functions/AbstractFunction.java        |  2 +-
 .../machine/functions/BranchFunction.java          |  5 +-
 .../machine/functions/branch/ChooseBranch.java     | 14 +----
 .../machine/functions/branch/RepeatBranch.java     | 19 +-----
 .../machine/functions/branch/UnionBranch.java      | 15 +----
 .../machine/functions/filter/FilterFilter.java     |  4 +-
 .../machine/functions/filter/HasKeyFilter.java     |  2 +-
 .../functions/filter/HasKeyValueFilter.java        |  2 +-
 .../machine/functions/filter/IdentityFilter.java   |  2 +-
 .../machine/functions/filter/IsFilter.java         |  2 +-
 .../machine/functions/flatmap/UnfoldFlatMap.java   |  2 +-
 .../machine/functions/initial/InjectInitial.java   |  2 +-
 .../tinkerpop/machine/functions/map/IncrMap.java   |  2 +-
 .../tinkerpop/machine/functions/map/MapMap.java    |  2 +-
 .../tinkerpop/machine/functions/map/PathMap.java   |  2 +-
 .../machine/functions/reduce/CountReduce.java      |  2 +-
 .../machine/functions/reduce/GroupCountReduce.java |  2 +-
 .../machine/functions/reduce/SumReduce.java        |  2 +-
 .../tinkerpop/machine/traversers/Traverser.java    |  4 --
 .../apache/tinkerpop/machine/beam/BranchFn.java    | 11 ++--
 .../apache/tinkerpop/machine/beam/BeamTest.java    |  5 ++
 .../apache/tinkerpop/machine/pipes/BranchStep.java | 43 ++++++++-----
 .../org/apache/tinkerpop/machine/pipes/Pipes.java  |  7 ++-
 .../apache/tinkerpop/machine/pipes/RepeatStep.java | 70 ++++++++++++++++++++++
 .../apache/tinkerpop/machine/pipes/PipesTest.java  |  2 +-
 28 files changed, 152 insertions(+), 88 deletions(-)

diff --git a/java/core/src/main/java/org/apache/tinkerpop/language/Symbols.java 
b/java/core/src/main/java/org/apache/tinkerpop/language/Symbols.java
index efeeaad..5ec8eea 100644
--- a/java/core/src/main/java/org/apache/tinkerpop/language/Symbols.java
+++ b/java/core/src/main/java/org/apache/tinkerpop/language/Symbols.java
@@ -34,6 +34,7 @@ public final class Symbols {
 
 
     // INSTRUCTION OPS
+    public static final String CHOOSE_IF_THEN = "chooseIfThen";
     public static final String CHOOSE_IF_THEN_ELSE = "chooseIfThenElse";
     public static final String COUNT = "count";
     public static final String FILTER = "filter";
@@ -53,6 +54,10 @@ public final class Symbols {
 
     public Type getOpType(final String op) {
         switch (op) {
+            case CHOOSE_IF_THEN:
+                return Type.BRANCH;
+            case CHOOSE_IF_THEN_ELSE:
+                return Type.BRANCH;
             case COUNT:
                 return Type.REDUCE;
             case FILTER:
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/language/Traversal.java 
b/java/core/src/main/java/org/apache/tinkerpop/language/Traversal.java
index 9a17bc5..5ba6f68 100644
--- a/java/core/src/main/java/org/apache/tinkerpop/language/Traversal.java
+++ b/java/core/src/main/java/org/apache/tinkerpop/language/Traversal.java
@@ -68,6 +68,11 @@ public class Traversal<C, S, E> implements Iterator<E> {
         return (Traversal) this;
     }
 
+    public <R> Traversal<C, S, R> choose(final Traversal<C, E, ?> predicate, 
final Traversal<C, S, R> trueTraversal) {
+        this.bytecode.addInstruction(this.currentCoefficient, 
Symbols.CHOOSE_IF_THEN, predicate, trueTraversal);
+        return (Traversal) this;
+    }
+
     public Traversal<C, S, Long> count() {
         this.bytecode.addInstruction(this.currentCoefficient, Symbols.COUNT);
         return (Traversal) this;
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java
index b7110a3..6959063 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java
@@ -137,6 +137,11 @@ public final class BytecodeUtil {
         final Coefficient<C> coefficient = instruction.coefficient();
         final Set<String> labels = instruction.labels();
         switch (op) {
+            case Symbols.CHOOSE_IF_THEN:
+                return new ChooseBranch<>(coefficient, labels,
+                        Compilation.compileOne(instruction.args()[0]),
+                        Compilation.compileOne(instruction.args()[1]),
+                        null);
             case Symbols.CHOOSE_IF_THEN_ELSE:
                 return new ChooseBranch<>(coefficient, labels,
                         Compilation.compileOne(instruction.args()[0]),
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/AbstractFunction.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/AbstractFunction.java
index d9d149b..402389b 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/AbstractFunction.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/AbstractFunction.java
@@ -26,7 +26,7 @@ import java.util.Set;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public abstract class AbstractFunction<C, S, E> implements CFunction<C> {
+public abstract class AbstractFunction<C> implements CFunction<C> {
 
     private Coefficient<C> coefficient;
     private Set<String> labels;
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/BranchFunction.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/BranchFunction.java
index 159b2c8..e2ecfe0 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/BranchFunction.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/BranchFunction.java
@@ -20,17 +20,14 @@ package org.apache.tinkerpop.machine.functions;
 
 import org.apache.tinkerpop.machine.bytecode.Compilation;
 import org.apache.tinkerpop.machine.functions.branch.selector.Selector;
-import org.apache.tinkerpop.machine.traversers.Traverser;
 
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.function.Function;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public interface BranchFunction<C, S, E, M> extends Function<Traverser<C, S>, 
Iterator<Traverser<C, E>>>, InternalFunction<C> {
+public interface BranchFunction<C, S, E, M> extends InternalFunction<C> {
 
     public Selector<C, S, M> getBranchSelector();
 
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/ChooseBranch.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/ChooseBranch.java
index c34763f..68d13f1 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/ChooseBranch.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/ChooseBranch.java
@@ -24,12 +24,10 @@ import 
org.apache.tinkerpop.machine.functions.AbstractFunction;
 import org.apache.tinkerpop.machine.functions.BranchFunction;
 import org.apache.tinkerpop.machine.functions.branch.selector.HasNextSelector;
 import org.apache.tinkerpop.machine.functions.branch.selector.Selector;
-import org.apache.tinkerpop.machine.traversers.Traverser;
 import org.apache.tinkerpop.util.StringFactory;
 
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -37,7 +35,7 @@ import java.util.Set;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class ChooseBranch<C, S, E> extends AbstractFunction<C, S, 
Iterator<Traverser<C, E>>> implements BranchFunction<C, S, E, Boolean> {
+public class ChooseBranch<C, S, E> extends AbstractFunction<C> implements 
BranchFunction<C, S, E, Boolean> {
 
     private final HasNextSelector<C, S> branchSelector;
     private final Map<Boolean, List<Compilation<C, S, E>>> branches;
@@ -59,14 +57,8 @@ public class ChooseBranch<C, S, E> extends 
AbstractFunction<C, S, Iterator<Trave
         this.branchSelector = new HasNextSelector<>(predicate);
         this.branches = new HashMap<>();
         this.branches.put(Boolean.TRUE, Collections.singletonList(trueBranch));
-        this.branches.put(Boolean.FALSE, 
Collections.singletonList(falseBranch));
-    }
-
-    @Override
-    public Iterator<Traverser<C, E>> apply(final Traverser<C, S> traverser) {
-        return this.predicate.filterTraverser(traverser) ?
-                this.trueBranch.addTraverser(traverser) :
-                this.falseBranch.addTraverser(traverser);
+        if (null != this.falseBranch)
+            this.branches.put(Boolean.FALSE, 
Collections.singletonList(falseBranch));
     }
 
     @Override
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/RepeatBranch.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/RepeatBranch.java
index cdbbad1..500b592 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/RepeatBranch.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/RepeatBranch.java
@@ -23,12 +23,8 @@ import org.apache.tinkerpop.machine.coefficients.Coefficient;
 import org.apache.tinkerpop.machine.functions.AbstractFunction;
 import org.apache.tinkerpop.machine.functions.BranchFunction;
 import org.apache.tinkerpop.machine.functions.branch.selector.Selector;
-import org.apache.tinkerpop.machine.processor.Processor;
-import org.apache.tinkerpop.machine.traversers.Traverser;
-import org.apache.tinkerpop.util.IteratorUtils;
 import org.apache.tinkerpop.util.StringFactory;
 
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -36,7 +32,7 @@ import java.util.Set;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class RepeatBranch<C, S> extends AbstractFunction<C, S, 
Iterator<Traverser<C, S>>> implements BranchFunction<C, S, S, Boolean> {
+public class RepeatBranch<C, S> extends AbstractFunction<C> implements 
BranchFunction<C, S, S, Boolean> {
 
     private final Compilation<C, S, S> repeat;
     private final Compilation<C, S, ?> until;
@@ -48,19 +44,6 @@ public class RepeatBranch<C, S> extends AbstractFunction<C, 
S, Iterator<Traverse
     }
 
     @Override
-    public Iterator<Traverser<C, S>> apply(final Traverser<C, S> traverser) {
-        final Processor<C, S, S> repeatProcessor = this.repeat.getProcessor();
-        repeatProcessor.addStart(traverser);
-        return IteratorUtils.filter(repeatProcessor, t -> {
-            if (!this.until.filterTraverser(t)) {
-                repeatProcessor.addStart(t);
-                return false;
-            } else
-                return true;
-        });
-    }
-
-    @Override
     public String toString() {
         return StringFactory.makeFunctionString(this, this.repeat, this.until);
     }
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/UnionBranch.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/UnionBranch.java
index 3195a40..3381f73 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/UnionBranch.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/UnionBranch.java
@@ -24,12 +24,9 @@ import 
org.apache.tinkerpop.machine.functions.AbstractFunction;
 import org.apache.tinkerpop.machine.functions.BranchFunction;
 import org.apache.tinkerpop.machine.functions.branch.selector.Selector;
 import org.apache.tinkerpop.machine.functions.branch.selector.TrueSelector;
-import org.apache.tinkerpop.machine.traversers.Traverser;
-import org.apache.tinkerpop.util.MultiIterator;
 import org.apache.tinkerpop.util.StringFactory;
 
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -37,11 +34,10 @@ import java.util.Set;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class UnionBranch<C, S, E> extends AbstractFunction<C, S, 
Iterator<Traverser<C, E>>> implements BranchFunction<C, S, E, Boolean> {
+public final class UnionBranch<C, S, E> extends AbstractFunction<C> implements 
BranchFunction<C, S, E, Boolean> {
 
     private final Map<Boolean, List<Compilation<C, S, E>>> branches;
 
-
     public UnionBranch(final Coefficient<C> coefficient, final Set<String> 
labels, final List<Compilation<C, S, E>> branches) {
         super(coefficient, labels);
         this.branches = new HashMap<>();
@@ -49,15 +45,6 @@ public final class UnionBranch<C, S, E> extends 
AbstractFunction<C, S, Iterator<
     }
 
     @Override
-    public Iterator<Traverser<C, E>> apply(final Traverser<C, S> traverser) {
-        final MultiIterator<Traverser<C, E>> iterator = new MultiIterator<>();
-        for (final Compilation<C, S, E> branch : 
this.branches.get(Boolean.TRUE)) {
-            iterator.addIterator(branch.addTraverser(traverser));
-        }
-        return iterator;
-    }
-
-    @Override
     public String toString() {
         return StringFactory.makeFunctionString(this, 
this.branches.values().iterator().next()); // make a flat array
     }
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/FilterFilter.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/FilterFilter.java
index 5f551c6..3c623eb 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/FilterFilter.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/FilterFilter.java
@@ -26,14 +26,12 @@ import 
org.apache.tinkerpop.machine.functions.NestedFunction;
 import org.apache.tinkerpop.machine.traversers.Traverser;
 import org.apache.tinkerpop.util.StringFactory;
 
-import java.util.Collections;
-import java.util.List;
 import java.util.Set;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class FilterFilter<C, S> extends AbstractFunction<C, S, S> 
implements FilterFunction<C, S>, NestedFunction<C> {
+public final class FilterFilter<C, S> extends AbstractFunction<C> implements 
FilterFunction<C, S>, NestedFunction<C> {
 
     private final Compilation<C, S, ?> internalFilter;
 
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/HasKeyFilter.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/HasKeyFilter.java
index 09320a7..63e2b71 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/HasKeyFilter.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/HasKeyFilter.java
@@ -31,7 +31,7 @@ import java.util.Set;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class HasKeyFilter<C, K, V> extends AbstractFunction<C, Map<K, 
V>, Map<K, V>> implements FilterFunction<C, Map<K, V>> {
+public final class HasKeyFilter<C, K, V> extends AbstractFunction<C> 
implements FilterFunction<C, Map<K, V>> {
 
     private final Argument<K> key;
 
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/HasKeyValueFilter.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/HasKeyValueFilter.java
index 70aecc3..c713c55 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/HasKeyValueFilter.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/HasKeyValueFilter.java
@@ -31,7 +31,7 @@ import java.util.Set;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class HasKeyValueFilter<C, K, V> extends AbstractFunction<C, 
Map<K, V>, Map<K, V>> implements FilterFunction<C, Map<K, V>> {
+public final class HasKeyValueFilter<C, K, V> extends AbstractFunction<C> 
implements FilterFunction<C, Map<K, V>> {
 
     private final Argument<K> key;
     private final Argument<V> value;
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/IdentityFilter.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/IdentityFilter.java
index 4d479ca..418fd6e 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/IdentityFilter.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/IdentityFilter.java
@@ -28,7 +28,7 @@ import java.util.Set;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class IdentityFilter<C, S> extends AbstractFunction<C, S, S> 
implements FilterFunction<C, S> {
+public final class IdentityFilter<C, S> extends AbstractFunction<C> implements 
FilterFunction<C, S> {
 
     public IdentityFilter(final Coefficient<C> coefficient, final Set<String> 
labels) {
         super(coefficient, labels);
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/IsFilter.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/IsFilter.java
index 129f40f..b556bc7 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/IsFilter.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/IsFilter.java
@@ -29,7 +29,7 @@ import java.util.Set;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class IsFilter<C, S> extends AbstractFunction<C, S, S> implements 
FilterFunction<C, S> {
+public final class IsFilter<C, S> extends AbstractFunction<C> implements 
FilterFunction<C, S> {
 
     private S object;
 
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/flatmap/UnfoldFlatMap.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/flatmap/UnfoldFlatMap.java
index 807ae39..ff05c8a 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/flatmap/UnfoldFlatMap.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/flatmap/UnfoldFlatMap.java
@@ -33,7 +33,7 @@ import java.util.Set;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class UnfoldFlatMap<C, S, E> extends AbstractFunction<C, S, E> 
implements FlatMapFunction<C, S, E> {
+public class UnfoldFlatMap<C, S, E> extends AbstractFunction<C> implements 
FlatMapFunction<C, S, E> {
 
     public UnfoldFlatMap(final Coefficient<C> coefficient, final Set<String> 
labels) {
         super(coefficient, labels);
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/initial/InjectInitial.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/initial/InjectInitial.java
index 4c15206..1f39997 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/initial/InjectInitial.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/initial/InjectInitial.java
@@ -30,7 +30,7 @@ import java.util.stream.Stream;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class InjectInitial<C, S> extends AbstractFunction<C, S, S> implements 
InitialFunction<C, S> {
+public class InjectInitial<C, S> extends AbstractFunction<C> implements 
InitialFunction<C, S> {
 
     private final S[] objects;
 
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/IncrMap.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/IncrMap.java
index e1eff44..15a193e 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/IncrMap.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/IncrMap.java
@@ -28,7 +28,7 @@ import java.util.Set;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class IncrMap<C> extends AbstractFunction<C, Long, Long> implements 
MapFunction<C, Long, Long> {
+public class IncrMap<C> extends AbstractFunction<C> implements MapFunction<C, 
Long, Long> {
 
     public IncrMap(final Coefficient<C> coefficient, final Set<String> labels) 
{
         super(coefficient, labels);
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/MapMap.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/MapMap.java
index b0d0fc5..4e2185e 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/MapMap.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/MapMap.java
@@ -33,7 +33,7 @@ import java.util.Set;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class MapMap<C, S, E> extends AbstractFunction<C, S, E> implements 
MapFunction<C, S, E>, NestedFunction<C> {
+public class MapMap<C, S, E> extends AbstractFunction<C> implements 
MapFunction<C, S, E>, NestedFunction<C> {
 
     private final Compilation<C, S, E> internalMap;
 
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/PathMap.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/PathMap.java
index 8e66ccd..dbf2a51 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/PathMap.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/PathMap.java
@@ -32,7 +32,7 @@ import java.util.Set;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class PathMap<C, S> extends AbstractFunction<C, S, Path> implements 
MapFunction<C, S, Path> {
+public class PathMap<C, S> extends AbstractFunction<C> implements 
MapFunction<C, S, Path> {
 
     private final CompilationRing<C, Object, Object> compilationRing;
 
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/CountReduce.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/CountReduce.java
index 65dde8e..f96e4e4 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/CountReduce.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/CountReduce.java
@@ -28,7 +28,7 @@ import java.util.Set;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class CountReduce<C, S> extends AbstractFunction<C, S, Long> implements 
ReduceFunction<C, S, Long> {
+public class CountReduce<C, S> extends AbstractFunction<C> implements 
ReduceFunction<C, S, Long> {
 
     public CountReduce(final Coefficient<C> coefficient, final Set<String> 
labels) {
         super(coefficient, labels);
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/GroupCountReduce.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/GroupCountReduce.java
index d9f713f..1bed8d9 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/GroupCountReduce.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/GroupCountReduce.java
@@ -34,7 +34,7 @@ import java.util.Set;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class GroupCountReduce<C, S, E> extends AbstractFunction<C, S, Map<E, 
Long>> implements ReduceFunction<C, S, Map<E, Long>>, NestedFunction<C> {
+public class GroupCountReduce<C, S, E> extends AbstractFunction<C> implements 
ReduceFunction<C, S, Map<E, Long>>, NestedFunction<C> {
 
     private final Compilation<C, S, E> byCompilation;
 
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/SumReduce.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/SumReduce.java
index 8963c2f..bd08c0e 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/SumReduce.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/SumReduce.java
@@ -29,7 +29,7 @@ import java.util.Set;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class SumReduce<C, S extends Number> extends AbstractFunction<C, S, S> 
implements ReduceFunction<C, S, S> {
+public class SumReduce<C, S extends Number> extends AbstractFunction<C> 
implements ReduceFunction<C, S, S> {
 
     public SumReduce(final Coefficient<C> coefficient, final Set<String> 
labels) {
         super(coefficient, labels);
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/Traverser.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/Traverser.java
index 1a4ed56..1174206 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/Traverser.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/Traverser.java
@@ -59,10 +59,6 @@ public interface Traverser<C, S> extends Serializable, 
Cloneable {
         return IteratorUtils.map(function.apply(this), e -> 
this.split(function, e));
     }
 
-    public default <E,M> Iterator<Traverser<C, E>> branch(final 
BranchFunction<C, S, E,M> function) {
-        return function.apply(this);
-    }
-
     //public default void sideEffect(final SideEffectFunction<C,S> function);
 
     public default <E> Traverser<C, E> reduce(final ReduceFunction<C, S, E> 
function, final E reducedValue) {
diff --git 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/BranchFn.java
 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/BranchFn.java
index f5560d3..1159553 100644
--- 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/BranchFn.java
+++ 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/BranchFn.java
@@ -41,10 +41,13 @@ public class BranchFn<C, S, E, M> extends AbstractFn<C, S, 
S> {
     }
 
     @ProcessElement
-    public void processElement(final @Element Traverser<C, S> traverser, final 
MultiOutputReceiver out) {
-        final Optional<M> selector = this.branchSelector.from(traverser);
-        if (selector.isPresent())
-            
out.get(this.branches.get(selector.get())).output(traverser.clone());
+    public void processElement(final @Element Traverser<C, S> traverser, final 
MultiOutputReceiver outputs) {
+        final Optional<M> selector = 
this.branchSelector.from(traverser.clone());
+        if (selector.isPresent()) {
+            final TupleTag<Traverser<C, S>> outputTag = 
this.branches.get(selector.get());
+            if (null != outputTag)
+                outputs.get(outputTag).output(traverser.clone());
+        }
 
     }
 }
\ No newline at end of file
diff --git 
a/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java
 
b/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java
index b161995..d04bac1 100644
--- 
a/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java
+++ 
b/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java
@@ -62,5 +62,10 @@ public class BeamTest {
         System.out.println(TraversalUtil.getBytecode(traversal));
         System.out.println(traversal);
         System.out.println(traversal.toList());
+
+        traversal = g.inject(8L).choose(__.is(7L),__.incr());
+        System.out.println(TraversalUtil.getBytecode(traversal));
+        System.out.println(traversal);
+        System.out.println(traversal.toList());
     }
 }
diff --git 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/BranchStep.java
 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/BranchStep.java
index fec154a..987be52 100644
--- 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/BranchStep.java
+++ 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/BranchStep.java
@@ -18,41 +18,56 @@
  */
 package org.apache.tinkerpop.machine.pipes;
 
+import org.apache.tinkerpop.machine.bytecode.Compilation;
 import org.apache.tinkerpop.machine.functions.BranchFunction;
+import org.apache.tinkerpop.machine.functions.branch.selector.Selector;
 import org.apache.tinkerpop.machine.traversers.Traverser;
+import org.apache.tinkerpop.util.MultiIterator;
 
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
 public final class BranchStep<C, S, E, M> extends AbstractStep<C, S, E> {
 
-    private final BranchFunction<C, S, E, M> branchFunction;
-    private Iterator<Traverser<C, E>> iterator = Collections.emptyIterator();
+    private final Selector<C, S, M> branchSelector;
+    private final Map<M, List<Compilation<C, S, E>>> branches;
+    private Iterator<Traverser<C, E>> output = Collections.emptyIterator();
 
     public BranchStep(final AbstractStep<C, ?, S> previousStep, final 
BranchFunction<C, S, E, M> branchFunction) {
         super(previousStep, branchFunction);
-        this.branchFunction = branchFunction;
+        this.branchSelector = branchFunction.getBranchSelector();
+        this.branches = branchFunction.getBranches();
     }
 
     @Override
     public boolean hasNext() {
-        while (true) {
-            if (this.iterator.hasNext())
-                return true;
-            else if (super.hasNext())
-                this.iterator = 
super.getPreviousTraverser().branch(this.branchFunction);
-            else
-                return false;
-        }
+        this.stageOutput();
+        return this.output.hasNext();
     }
 
     @Override
     public Traverser<C, E> next() {
-        if (!this.iterator.hasNext())
-            this.iterator = 
super.getPreviousTraverser().branch(this.branchFunction);
-        return this.iterator.next();
+        this.stageOutput();
+        return this.output.next();
     }
+
+    private final void stageOutput() {
+        while (!this.output.hasNext() && super.hasNext()) {
+            final Traverser<C, S> traverser = super.getPreviousTraverser();
+            final Optional<M> token = this.branchSelector.from(traverser);
+            if (token.isPresent()) {
+                this.output = new MultiIterator<>();
+                for (final Compilation<C, S, E> branch : 
this.branches.get(token.get())) {
+                    ((MultiIterator<Traverser<C, E>>) 
this.output).addIterator(branch.addTraverser(traverser.clone()));
+                }
+            }
+        }
+    }
+
 }
\ No newline at end of file
diff --git 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java
 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java
index 24e0cd4..f04be72 100644
--- 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java
+++ 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java
@@ -26,6 +26,7 @@ import org.apache.tinkerpop.machine.functions.FlatMapFunction;
 import org.apache.tinkerpop.machine.functions.InitialFunction;
 import org.apache.tinkerpop.machine.functions.MapFunction;
 import org.apache.tinkerpop.machine.functions.ReduceFunction;
+import org.apache.tinkerpop.machine.functions.branch.RepeatBranch;
 import org.apache.tinkerpop.machine.pipes.util.BasicReducer;
 import org.apache.tinkerpop.machine.processor.Processor;
 import org.apache.tinkerpop.machine.traversers.Traverser;
@@ -46,8 +47,10 @@ public final class Pipes<C, S, E> implements Processor<C, S, 
E> {
         AbstractStep<C, ?, ?> previousStep = EmptyStep.instance();
         for (final CFunction<?> function : compilation.getFunctions()) {
             final AbstractStep nextStep;
-            if (function instanceof BranchFunction)
-                nextStep = new BranchStep(previousStep, (BranchFunction<C, ?, 
?,?>) function);
+            if (function instanceof RepeatBranch)
+                nextStep = new RepeatStep(previousStep, (RepeatBranch<C, ?>) 
function);
+            else if (function instanceof BranchFunction)
+                nextStep = new BranchStep(previousStep, (BranchFunction<C, ?, 
?, ?>) function);
             else if (function instanceof FilterFunction)
                 nextStep = new FilterStep(previousStep, (FilterFunction<C, ?>) 
function);
             else if (function instanceof FlatMapFunction)
diff --git 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/RepeatStep.java
 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/RepeatStep.java
new file mode 100644
index 0000000..a303208
--- /dev/null
+++ 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/RepeatStep.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.machine.pipes;
+
+import org.apache.tinkerpop.machine.bytecode.Compilation;
+import org.apache.tinkerpop.machine.functions.branch.RepeatBranch;
+import org.apache.tinkerpop.machine.traversers.Traverser;
+import org.apache.tinkerpop.util.IteratorUtils;
+
+import java.util.Iterator;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class RepeatStep<C, S> extends AbstractStep<C, S, S> {
+
+    private final Compilation<C, S, ?> until;
+    private final Compilation<C, S, S> repeat;
+    private final Iterator<Traverser<C, S>> output;
+
+    public RepeatStep(final AbstractStep<C, ?, S> previousStep, final 
RepeatBranch<C, S> repeatFunction) {
+        super(previousStep, repeatFunction);
+        this.until = repeatFunction.getUntil();
+        this.repeat = repeatFunction.getRepeat();
+        this.output = IteratorUtils.filter(this.repeat.getProcessor(), t -> {
+            if (!this.until.filterTraverser(t)) {
+                this.repeat.getProcessor().addStart(t);
+                return false;
+            } else
+                return true;
+        });
+    }
+
+    @Override
+    public boolean hasNext() {
+        this.stageOutput();
+        return this.output.hasNext();
+    }
+
+    @Override
+    public Traverser<C, S> next() {
+        this.stageOutput();
+        return this.output.next();
+    }
+
+    private final void stageOutput() {
+        while (!this.output.hasNext() && super.hasNext()) {
+            this.repeat.addTraverser(super.getPreviousTraverser());
+        }
+
+    }
+
+}
+
diff --git 
a/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java
 
b/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java
index d51b966..81374a4 100644
--- 
a/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java
+++ 
b/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java
@@ -54,7 +54,7 @@ public class PipesTest {
         System.out.println(traversal);
         System.out.println(traversal.toList());
         System.out.println("\n----------\n");
-        traversal = 
g.inject(8L).choose(__.is(7L),__.incr(),__.<Long>incr().incr());
+        traversal = g.inject(7L).choose(__.is(7L),__.incr());
         System.out.println(TraversalUtil.getBytecode(traversal));
         System.out.println(traversal);
         System.out.println(traversal.toList());

Reply via email to