This is an automated email from the ASF dual-hosted git repository.

okram pushed a commit to branch tp4
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit 7e313cb66ce4e557c284b98b3a194bda505f80de
Author: Marko A. Rodriguez <okramma...@gmail.com>
AuthorDate: Mon Mar 11 15:41:40 2019 -0600

    added test/ module and stubbed the first IdentityTest.
---
 .../tinkerpop/machine/bytecode/BytecodeUtil.java   |  4 ++--
 .../{NestedFunction.java => BranchFunction.java}   | 18 ++++-----------
 .../{NestedFunction.java => InternalFunction.java} | 13 +----------
 .../machine/functions/NestedFunction.java          | 13 +----------
 .../UnionFlatMap.java => branch/UnionBranch.java}  | 24 +++++++++----------
 .../machine/functions/filter/FilterFilter.java     |  2 +-
 .../tinkerpop/machine/functions/map/MapMap.java    |  2 +-
 .../apache/tinkerpop/machine/traversers/Path.java  |  3 +++
 .../tinkerpop/machine/traversers/Traverser.java    |  5 ++++
 .../org/apache/tinkerpop/machine/beam/Beam.java    | 15 ++++++------
 .../apache/tinkerpop/machine/beam/BranchFn.java}   | 27 ++++++++++++----------
 java/machine/pipes/pom.xml                         |  7 ++++++
 .../pipes/{FlatMapStep.java => BranchStep.java}    | 23 +++++++++++-------
 .../tinkerpop/machine/pipes/FlatMapStep.java       |  9 +++++++-
 .../org/apache/tinkerpop/machine/pipes/Pipes.java  | 11 +++++----
 .../machine/pipes/PipesIdentityTest.java}          | 26 ++++++++++-----------
 .../apache/tinkerpop/machine/pipes/PipesTest.java  |  4 ++--
 java/pom.xml                                       |  1 +
 java/{machine/pipes => test}/pom.xml               | 17 +++++++++++---
 .../machine/functions/filters/IdentityTest.java}   | 22 ++++++++----------
 20 files changed, 129 insertions(+), 117 deletions(-)

diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java
index 07f2f1d..4746e97 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java
@@ -24,7 +24,7 @@ import org.apache.tinkerpop.machine.functions.CFunction;
 import org.apache.tinkerpop.machine.functions.filter.FilterFilter;
 import org.apache.tinkerpop.machine.functions.filter.IdentityFilter;
 import org.apache.tinkerpop.machine.functions.filter.IsFilter;
-import org.apache.tinkerpop.machine.functions.flatMap.UnionFlatMap;
+import org.apache.tinkerpop.machine.functions.branch.UnionBranch;
 import org.apache.tinkerpop.machine.functions.initial.InjectInitial;
 import org.apache.tinkerpop.machine.functions.map.IncrMap;
 import org.apache.tinkerpop.machine.functions.map.MapMap;
@@ -143,7 +143,7 @@ public final class BytecodeUtil {
                 for (final Bytecode<C> arg : (Bytecode<C>[]) 
instruction.args()) {
                     branchFunctions.add(compile(arg));
                 }
-                return new UnionFlatMap<>(coefficient, labels, 
branchFunctions);
+                return new UnionBranch<>(coefficient, labels, branchFunctions);
             default:
                 throw new RuntimeException("This is an unknown instruction:" + 
instruction.op());
         }
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/NestedFunction.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/BranchFunction.java
similarity index 64%
copy from 
java/core/src/main/java/org/apache/tinkerpop/machine/functions/NestedFunction.java
copy to 
java/core/src/main/java/org/apache/tinkerpop/machine/functions/BranchFunction.java
index 2119f54..cfebf7a 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/NestedFunction.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/BranchFunction.java
@@ -18,24 +18,16 @@
  */
 package org.apache.tinkerpop.machine.functions;
 
-import org.apache.tinkerpop.machine.processor.ProcessorFactory;
-import org.apache.tinkerpop.machine.traversers.TraverserFactory;
+import org.apache.tinkerpop.machine.traversers.Traverser;
 
+import java.util.Iterator;
 import java.util.List;
+import java.util.function.Function;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public interface NestedFunction<C, S, E> extends CFunction<C> {
+public interface BranchFunction<C, S, E> extends Function<Traverser<C, S>, 
Iterator<Traverser<C, E>>>, InternalFunction<C> {
 
-    public void setProcessor(final TraverserFactory<C> traverserFactory, final 
ProcessorFactory processorFactory);
-
-    public List<List<CFunction<C>>> getFunctions();
-
-    public interface Branching<C, S, E> extends NestedFunction<C, S, E> {
-    }
-
-    public interface Internal<C, S, E> extends NestedFunction<C, S, E> {
-
-    }
+    public List<List<CFunction<C>>> getBranches();
 }
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/NestedFunction.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/InternalFunction.java
similarity index 79%
copy from 
java/core/src/main/java/org/apache/tinkerpop/machine/functions/NestedFunction.java
copy to 
java/core/src/main/java/org/apache/tinkerpop/machine/functions/InternalFunction.java
index 2119f54..e399b19 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/NestedFunction.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/InternalFunction.java
@@ -21,21 +21,10 @@ package org.apache.tinkerpop.machine.functions;
 import org.apache.tinkerpop.machine.processor.ProcessorFactory;
 import org.apache.tinkerpop.machine.traversers.TraverserFactory;
 
-import java.util.List;
-
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public interface NestedFunction<C, S, E> extends CFunction<C> {
+public interface InternalFunction<C> extends CFunction<C> {
 
     public void setProcessor(final TraverserFactory<C> traverserFactory, final 
ProcessorFactory processorFactory);
-
-    public List<List<CFunction<C>>> getFunctions();
-
-    public interface Branching<C, S, E> extends NestedFunction<C, S, E> {
-    }
-
-    public interface Internal<C, S, E> extends NestedFunction<C, S, E> {
-
-    }
 }
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/NestedFunction.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/NestedFunction.java
index 2119f54..f0bc884 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/NestedFunction.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/NestedFunction.java
@@ -18,24 +18,13 @@
  */
 package org.apache.tinkerpop.machine.functions;
 
-import org.apache.tinkerpop.machine.processor.ProcessorFactory;
-import org.apache.tinkerpop.machine.traversers.TraverserFactory;
-
 import java.util.List;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public interface NestedFunction<C, S, E> extends CFunction<C> {
-
-    public void setProcessor(final TraverserFactory<C> traverserFactory, final 
ProcessorFactory processorFactory);
+public interface NestedFunction<C> extends InternalFunction<C> {
 
     public List<List<CFunction<C>>> getFunctions();
 
-    public interface Branching<C, S, E> extends NestedFunction<C, S, E> {
-    }
-
-    public interface Internal<C, S, E> extends NestedFunction<C, S, E> {
-
-    }
 }
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/flatMap/UnionFlatMap.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/UnionBranch.java
similarity index 75%
rename from 
java/core/src/main/java/org/apache/tinkerpop/machine/functions/flatMap/UnionFlatMap.java
rename to 
java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/UnionBranch.java
index 8a5adef..9634b83 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/flatMap/UnionFlatMap.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/UnionBranch.java
@@ -16,18 +16,16 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.tinkerpop.machine.functions.flatMap;
+package org.apache.tinkerpop.machine.functions.branch;
 
 import org.apache.tinkerpop.machine.coefficients.Coefficient;
 import org.apache.tinkerpop.machine.functions.AbstractFunction;
+import org.apache.tinkerpop.machine.functions.BranchFunction;
 import org.apache.tinkerpop.machine.functions.CFunction;
-import org.apache.tinkerpop.machine.functions.FlatMapFunction;
-import org.apache.tinkerpop.machine.functions.NestedFunction;
 import org.apache.tinkerpop.machine.processor.Processor;
 import org.apache.tinkerpop.machine.processor.ProcessorFactory;
 import org.apache.tinkerpop.machine.traversers.Traverser;
 import org.apache.tinkerpop.machine.traversers.TraverserFactory;
-import org.apache.tinkerpop.util.IteratorUtils;
 import org.apache.tinkerpop.util.MultiIterator;
 import org.apache.tinkerpop.util.StringFactory;
 
@@ -39,42 +37,44 @@ import java.util.Set;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class UnionFlatMap<C, S, E> extends AbstractFunction<C, S, 
Iterator<E>> implements FlatMapFunction<C, S, E>, NestedFunction.Branching<C, 
S, E> {
-    // TODO: we need a branch/ package as these need to be NOT flatMap 
functions.
+public final class UnionBranch<C, S, E> extends AbstractFunction<C, S, 
Iterator<Traverser<C, E>>> implements BranchFunction<C, S, E> {
     private final List<List<CFunction<C>>> branchFunctions;
+
+
     private transient List<Processor<C, S, E>> processors;
     private TraverserFactory<C> traverserFactory;
     private ProcessorFactory processorFactory;
 
-    public UnionFlatMap(final Coefficient<C> coefficient, final Set<String> 
labels, final List<List<CFunction<C>>> branchFunctions) {
+    public UnionBranch(final Coefficient<C> coefficient, final Set<String> 
labels, final List<List<CFunction<C>>> branchFunctions) {
         super(coefficient, labels);
         this.branchFunctions = branchFunctions;
     }
 
     @Override
-    public Iterator<E> apply(final Traverser<C, S> traverser) {
+    public Iterator<Traverser<C, E>> apply(final Traverser<C, S> traverser) {
         if (null == this.processors) {
             this.processors = new ArrayList<>(this.branchFunctions.size());
             for (final List<CFunction<C>> functions : this.branchFunctions) {
                 this.processors.add(processorFactory.mint(traverserFactory, 
functions));
             }
         }
-        final MultiIterator<E> iterator = new MultiIterator<>();
+        final MultiIterator<Traverser<C, E>> iterator = new MultiIterator<>();
         for (final Processor<C, S, E> processor : this.processors) {
             processor.reset();
             processor.addStart(traverser.clone());
-            iterator.addIterator(IteratorUtils.map(processor, 
Traverser::object));
+            iterator.addIterator(processor);
         }
-
         return iterator;
     }
 
+    @Override
     public void setProcessor(final TraverserFactory<C> traverserFactory, final 
ProcessorFactory processorFactory) {
         this.traverserFactory = traverserFactory;
         this.processorFactory = processorFactory;
     }
 
-    public List<List<CFunction<C>>> getFunctions() {
+    @Override
+    public List<List<CFunction<C>>> getBranches() {
         return this.branchFunctions;
     }
 
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/FilterFilter.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/FilterFilter.java
index a4a18ec..d247e1c 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/FilterFilter.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/FilterFilter.java
@@ -36,7 +36,7 @@ import java.util.Set;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class FilterFilter<C, S> extends AbstractFunction<C, S, S> 
implements FilterFunction<C, S>, NestedFunction.Internal<C, S, S> {
+public final class FilterFilter<C, S> extends AbstractFunction<C, S, S> 
implements FilterFunction<C, S>, NestedFunction<C> {
 
     private final List<CFunction<C>> filterFunctions;
     private TraverserFactory<C> traverserFactory;
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/MapMap.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/MapMap.java
index 63d64c0..67e30ee 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/MapMap.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/MapMap.java
@@ -36,7 +36,7 @@ import java.util.Set;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class MapMap<C, S, E> extends AbstractFunction<C, S, E> implements 
MapFunction<C, S, E>, NestedFunction.Internal<C, S, E> {
+public class MapMap<C, S, E> extends AbstractFunction<C, S, E> implements 
MapFunction<C, S, E>, NestedFunction<C> {
 
     private final List<CFunction<C>> mapFunctions;
     private TraverserFactory<C> traverserFactory;
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/Path.java 
b/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/Path.java
index 9f5b997..857cc1b 100644
--- a/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/Path.java
+++ b/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/Path.java
@@ -20,6 +20,7 @@ package org.apache.tinkerpop.machine.traversers;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
@@ -45,6 +46,8 @@ public class Path implements Serializable {
     }
 
     public void addLabels(final Set<String> labels) {
+        if (this.labels.isEmpty())
+            this.labels.add(new HashSet<>());
         this.labels.get(this.labels.size() - 1).addAll(labels);
     }
 
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/Traverser.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/Traverser.java
index 6484d18..59c9752 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/Traverser.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/Traverser.java
@@ -19,6 +19,7 @@
 package org.apache.tinkerpop.machine.traversers;
 
 import org.apache.tinkerpop.machine.coefficients.Coefficient;
+import org.apache.tinkerpop.machine.functions.BranchFunction;
 import org.apache.tinkerpop.machine.functions.CFunction;
 import org.apache.tinkerpop.machine.functions.FilterFunction;
 import org.apache.tinkerpop.machine.functions.FlatMapFunction;
@@ -57,6 +58,10 @@ public interface Traverser<C, S> extends Serializable, 
Cloneable {
         return IteratorUtils.map(function.apply(this), e -> 
this.split(function, e));
     }
 
+    public default <E> Iterator<Traverser<C, E>> branch(final 
BranchFunction<C, S, E> function) {
+        return function.apply(this);
+    }
+
     //public default void sideEffect(final SideEffectFunction<C,S> function);
 
     public default <E> Traverser<C, E> reduce(final ReduceFunction<C, S, E> 
function, final E reducedValue) {
diff --git 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java
index 6531693..7aedadf 100644
--- 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java
+++ 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java
@@ -31,12 +31,13 @@ import org.apache.tinkerpop.machine.bytecode.Bytecode;
 import org.apache.tinkerpop.machine.bytecode.BytecodeUtil;
 import org.apache.tinkerpop.machine.coefficients.Coefficient;
 import org.apache.tinkerpop.machine.coefficients.LongCoefficient;
+import org.apache.tinkerpop.machine.functions.BranchFunction;
 import org.apache.tinkerpop.machine.functions.CFunction;
 import org.apache.tinkerpop.machine.functions.FilterFunction;
 import org.apache.tinkerpop.machine.functions.FlatMapFunction;
 import org.apache.tinkerpop.machine.functions.InitialFunction;
+import org.apache.tinkerpop.machine.functions.InternalFunction;
 import org.apache.tinkerpop.machine.functions.MapFunction;
-import org.apache.tinkerpop.machine.functions.NestedFunction;
 import org.apache.tinkerpop.machine.functions.ReduceFunction;
 import org.apache.tinkerpop.machine.pipes.PipesProcessor;
 import org.apache.tinkerpop.machine.processor.Processor;
@@ -75,11 +76,11 @@ public class Beam<C, S, E> implements Processor<C, S, E> {
 
     private PCollection<Traverser<C, ?>> 
processFunction(PCollection<Traverser<C, ?>> collection, final CFunction<?> 
function, final boolean branching) {
         DoFn<Traverser<C, S>, Traverser<C, E>> fn = null;
-        if (function instanceof NestedFunction.Internal)
-            ((NestedFunction<C, ?, ?>) 
function).setProcessor(this.traverserFactory, new PipesProcessor());
+        if (function instanceof InternalFunction)
+            ((InternalFunction<C>) 
function).setProcessor(this.traverserFactory, new PipesProcessor());
 
-        if (function instanceof NestedFunction.Branching) {
-            final List<List<CFunction<C>>> branches = 
((NestedFunction.Branching) function).getFunctions();
+        if (function instanceof BranchFunction) {
+            final List<List<CFunction<C>>> branches = ((BranchFunction) 
function).getBranches();
             final List<PCollection<Traverser<C, ?>>> collections = new 
ArrayList<>(branches.size());
             for (final List<CFunction<C>> branch : branches) {
                 PCollection<Traverser<C, ?>> branchCollection = collection;
@@ -89,7 +90,7 @@ public class Beam<C, S, E> implements Processor<C, S, E> {
                 collections.add(branchCollection);
             }
             collection = 
PCollectionList.of(collections).apply(Flatten.pCollections());
-            this.functions.add(new FlatMapFn<>((FlatMapFunction<C, S, E>) 
function));
+            this.functions.add(new BranchFn<>((BranchFunction<C, S, E>) 
function));
         } else if (function instanceof InitialFunction) {
             fn = new InitialFn((InitialFunction<C, S>) function, 
this.traverserFactory);
         } else if (function instanceof FilterFunction) {
@@ -105,7 +106,7 @@ public class Beam<C, S, E> implements Processor<C, S, E> {
         } else
             throw new RuntimeException("You need a new step type:" + function);
 
-        if (!(function instanceof ReduceFunction) && !(function instanceof 
NestedFunction.Branching)) {
+        if (!(function instanceof ReduceFunction) && !(function instanceof 
BranchFunction)) {
             if (!branching)
                 this.functions.add((Fn) fn);
             collection = (PCollection<Traverser<C, ?>>) 
collection.apply(ParDo.of((DoFn) fn));
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/NestedFunction.java
 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/BranchFn.java
similarity index 54%
copy from 
java/core/src/main/java/org/apache/tinkerpop/machine/functions/NestedFunction.java
copy to 
java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/BranchFn.java
index 2119f54..40aec28 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/NestedFunction.java
+++ 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/BranchFn.java
@@ -16,26 +16,29 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.tinkerpop.machine.functions;
+package org.apache.tinkerpop.machine.beam;
 
-import org.apache.tinkerpop.machine.processor.ProcessorFactory;
-import org.apache.tinkerpop.machine.traversers.TraverserFactory;
+import org.apache.tinkerpop.machine.functions.BranchFunction;
+import org.apache.tinkerpop.machine.traversers.Traverser;
 
-import java.util.List;
+import java.util.Iterator;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public interface NestedFunction<C, S, E> extends CFunction<C> {
+public class BranchFn<C, S, E> extends AbstractFn<C, S, E> {
 
-    public void setProcessor(final TraverserFactory<C> traverserFactory, final 
ProcessorFactory processorFactory);
+    private BranchFunction<C, S, E> branchFunction;
 
-    public List<List<CFunction<C>>> getFunctions();
-
-    public interface Branching<C, S, E> extends NestedFunction<C, S, E> {
+    public BranchFn(final BranchFunction<C, S, E> branchFunction) {
+        super(branchFunction);
+        this.branchFunction = branchFunction;
     }
 
-    public interface Internal<C, S, E> extends NestedFunction<C, S, E> {
-
+    @ProcessElement
+    public void processElement(final @Element Traverser<C, S> traverser, final 
OutputReceiver<Traverser<C, E>> output) {
+        Iterator<Traverser<C, E>> iterator = 
traverser.branch(this.branchFunction);
+        while (iterator.hasNext())
+            output.output(iterator.next());
     }
-}
+}
\ No newline at end of file
diff --git a/java/machine/pipes/pom.xml b/java/machine/pipes/pom.xml
index 3d40c2a..2c4008f 100644
--- a/java/machine/pipes/pom.xml
+++ b/java/machine/pipes/pom.xml
@@ -29,6 +29,13 @@ limitations under the License.
             <artifactId>core</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <!-- TESTING -->
+        <dependency>
+            <groupId>org.apache.tinkerpop</groupId>
+            <artifactId>test</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
     <build>
         <directory>${basedir}/target</directory>
diff --git 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/FlatMapStep.java
 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/BranchStep.java
similarity index 63%
copy from 
java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/FlatMapStep.java
copy to 
java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/BranchStep.java
index 468672a..9de62ae 100644
--- 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/FlatMapStep.java
+++ 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/BranchStep.java
@@ -18,7 +18,7 @@
  */
 package org.apache.tinkerpop.machine.pipes;
 
-import org.apache.tinkerpop.machine.functions.FlatMapFunction;
+import org.apache.tinkerpop.machine.functions.BranchFunction;
 import org.apache.tinkerpop.machine.traversers.Traverser;
 
 import java.util.Collections;
@@ -27,25 +27,32 @@ import java.util.Iterator;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class FlatMapStep<C, S, E> extends AbstractStep<C, S, E> {
+public class BranchStep<C, S, E> extends AbstractStep<C, S, E> {
 
-    private final FlatMapFunction<C, S, E> flatMapFunction;
+    private final BranchFunction<C, S, E> branchFunction;
     private Iterator<Traverser<C, E>> iterator = Collections.emptyIterator();
 
-    public FlatMapStep(final AbstractStep<C, ?, S> previousStep, final 
FlatMapFunction<C, S, E> flatMapFunction) {
-        super(previousStep, flatMapFunction);
-        this.flatMapFunction = flatMapFunction;
+    public BranchStep(final AbstractStep<C, ?, S> previousStep, final 
BranchFunction<C, S, E> branchFunction) {
+        super(previousStep, branchFunction);
+        this.branchFunction = branchFunction;
     }
 
     @Override
     public boolean hasNext() {
-        return this.iterator.hasNext() || super.hasNext();
+        while (true) {
+            if (this.iterator.hasNext())
+                return true;
+            else if (super.hasNext())
+                this.iterator = 
super.getPreviousTraverser().branch(this.branchFunction);
+            else
+                return false;
+        }
     }
 
     @Override
     public Traverser<C, E> next() {
         if (!this.iterator.hasNext()) {
-            this.iterator = 
super.getPreviousTraverser().flatMap(this.flatMapFunction);
+            this.iterator = 
super.getPreviousTraverser().branch(this.branchFunction);
         }
         return this.iterator.next();
     }
diff --git 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/FlatMapStep.java
 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/FlatMapStep.java
index 468672a..1dcd090 100644
--- 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/FlatMapStep.java
+++ 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/FlatMapStep.java
@@ -39,7 +39,14 @@ public class FlatMapStep<C, S, E> extends AbstractStep<C, S, 
E> {
 
     @Override
     public boolean hasNext() {
-        return this.iterator.hasNext() || super.hasNext();
+        while (true) {
+            if (this.iterator.hasNext())
+                return true;
+            else if (super.hasNext())
+                this.iterator = 
super.getPreviousTraverser().flatMap(this.flatMapFunction);
+            else
+                return false;
+        }
     }
 
     @Override
diff --git 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java
 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java
index 7cd8872..a413b0f 100644
--- 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java
+++ 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java
@@ -18,12 +18,13 @@
  */
 package org.apache.tinkerpop.machine.pipes;
 
+import org.apache.tinkerpop.machine.functions.BranchFunction;
 import org.apache.tinkerpop.machine.functions.CFunction;
 import org.apache.tinkerpop.machine.functions.FilterFunction;
 import org.apache.tinkerpop.machine.functions.FlatMapFunction;
 import org.apache.tinkerpop.machine.functions.InitialFunction;
+import org.apache.tinkerpop.machine.functions.InternalFunction;
 import org.apache.tinkerpop.machine.functions.MapFunction;
-import org.apache.tinkerpop.machine.functions.NestedFunction;
 import org.apache.tinkerpop.machine.functions.ReduceFunction;
 import org.apache.tinkerpop.machine.pipes.util.BasicReducer;
 import org.apache.tinkerpop.machine.processor.Processor;
@@ -45,11 +46,13 @@ public class Pipes<C, S, E> implements Processor<C, S, E> {
     public Pipes(final TraverserFactory<C> traverserFactory, final 
List<CFunction<C>> functions) {
         AbstractStep<C, ?, ?> previousStep = EmptyStep.instance();
         for (final CFunction<?> function : functions) {
-            if (function instanceof NestedFunction)
-                ((NestedFunction<C, ?, ?>) 
function).setProcessor(traverserFactory, new PipesProcessor());
+            if (function instanceof InternalFunction)
+                ((InternalFunction<C>) 
function).setProcessor(traverserFactory, new PipesProcessor());
             /////////
             final AbstractStep nextStep;
-            if (function instanceof FilterFunction)
+            if (function instanceof BranchFunction)
+                nextStep = new BranchStep(previousStep, (BranchFunction<C, ?, 
?>) function);
+            else if (function instanceof FilterFunction)
                 nextStep = new FilterStep(previousStep, (FilterFunction<C, ?>) 
function);
             else if (function instanceof FlatMapFunction)
                 nextStep = new FlatMapStep(previousStep, (FlatMapFunction<C, 
?, ?>) function);
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/NestedFunction.java
 
b/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesIdentityTest.java
similarity index 57%
copy from 
java/core/src/main/java/org/apache/tinkerpop/machine/functions/NestedFunction.java
copy to 
java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesIdentityTest.java
index 2119f54..88b833d 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/NestedFunction.java
+++ 
b/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesIdentityTest.java
@@ -16,26 +16,24 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.tinkerpop.machine.functions;
+package org.apache.tinkerpop.machine.pipes;
 
-import org.apache.tinkerpop.machine.processor.ProcessorFactory;
-import org.apache.tinkerpop.machine.traversers.TraverserFactory;
-
-import java.util.List;
+import org.apache.tinkerpop.language.Gremlin;
+import org.apache.tinkerpop.language.TraversalSource;
+import org.apache.tinkerpop.machine.coefficients.LongCoefficient;
+import org.apache.tinkerpop.machine.functions.filters.IdentityTest;
+import org.junit.jupiter.api.Test;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public interface NestedFunction<C, S, E> extends CFunction<C> {
-
-    public void setProcessor(final TraverserFactory<C> traverserFactory, final 
ProcessorFactory processorFactory);
-
-    public List<List<CFunction<C>>> getFunctions();
-
-    public interface Branching<C, S, E> extends NestedFunction<C, S, E> {
-    }
+public class PipesIdentityTest {
 
-    public interface Internal<C, S, E> extends NestedFunction<C, S, E> {
+    private TraversalSource<Long> g = 
Gremlin.<Long>traversal().withProcessor(PipesProcessor.class).withCoefficient(LongCoefficient.class);
+    private IdentityTest test = new IdentityTest();
 
+    @Test
+    public void g_injectX2X_identity() {
+        test.g_injectX2X_identity(g.inject(2L).identity());
     }
 }
diff --git 
a/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java
 
b/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java
index 49a802a..28b24c9 100644
--- 
a/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java
+++ 
b/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java
@@ -39,7 +39,7 @@ public class PipesTest {
                 .withProcessor(PipesProcessor.class)
                 .withStrategy(IdentityStrategy.class);
 
-        Traversal<Long, Long, ?> traversal = g.inject(7L, 10L, 
12L).as("a").c(3L).map(__.incr()).identity().incr().identity().identity().sum().path();
+        Traversal<Long, Long, ?> traversal = g.inject(7L, 10L, 
12L).as("a").c(3L).map(__.incr()).identity().incr().identity().identity().sum().path().identity();
         System.out.println(TraversalUtil.getBytecode(traversal));
         System.out.println(traversal);
         System.out.println(traversal.toList());
@@ -49,7 +49,7 @@ public class PipesTest {
         System.out.println(traversal);
         System.out.println(traversal.toList());
         System.out.println("\n----------\n");
-        traversal = g.inject(7L).union(__.<Long>incr().incr().count(), 
__.<Long, Long>c(10L).incr().sum()).count();
+        traversal = g.inject(7L).union(__.<Long>incr().incr().count(), 
__.<Long, Long>c(10L).incr().sum());
         System.out.println(TraversalUtil.getBytecode(traversal));
         System.out.println(traversal);
         System.out.println(traversal.toList());
diff --git a/java/pom.xml b/java/pom.xml
index 3887958..67ab0d3 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -66,6 +66,7 @@ limitations under the License.
     <modules>
         <module>core</module>
         <module>machine</module>
+        <module>test</module>
     </modules>
     <scm>
         <connection>scm:git:g...@github.com:apache/tinkerpop.git</connection>
diff --git a/java/machine/pipes/pom.xml b/java/test/pom.xml
similarity index 79%
copy from java/machine/pipes/pom.xml
copy to java/test/pom.xml
index 3d40c2a..4f357a1 100644
--- a/java/machine/pipes/pom.xml
+++ b/java/test/pom.xml
@@ -16,19 +16,30 @@ limitations under the License.
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
     <modelVersion>4.0.0</modelVersion>
+
     <parent>
-        <artifactId>machine</artifactId>
+        <artifactId>java</artifactId>
         <groupId>org.apache.tinkerpop</groupId>
         <version>4.0.0-SNAPSHOT</version>
     </parent>
-    <name>Apache TinkerPop (Java) :: Machine :: Pipes</name>
-    <artifactId>pipes</artifactId>
+    <name>Apache TinkerPop (Java) :: Test</name>
+    <artifactId>test</artifactId>
     <dependencies>
         <dependency>
             <groupId>org.apache.tinkerpop</groupId>
             <artifactId>core</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-api</artifactId>
+            <version>${junit.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-engine</artifactId>
+            <version>${junit.version}</version>
+        </dependency>
     </dependencies>
     <build>
         <directory>${basedir}/target</directory>
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/NestedFunction.java
 
b/java/test/src/main/java/org/apache/tinkerpop/machine/functions/filters/IdentityTest.java
similarity index 61%
copy from 
java/core/src/main/java/org/apache/tinkerpop/machine/functions/NestedFunction.java
copy to 
java/test/src/main/java/org/apache/tinkerpop/machine/functions/filters/IdentityTest.java
index 2119f54..45d0515 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/NestedFunction.java
+++ 
b/java/test/src/main/java/org/apache/tinkerpop/machine/functions/filters/IdentityTest.java
@@ -16,26 +16,22 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.tinkerpop.machine.functions;
+package org.apache.tinkerpop.machine.functions.filters;
 
-import org.apache.tinkerpop.machine.processor.ProcessorFactory;
-import org.apache.tinkerpop.machine.traversers.TraverserFactory;
+import org.apache.tinkerpop.language.Traversal;
 
 import java.util.List;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public interface NestedFunction<C, S, E> extends CFunction<C> {
-
-    public void setProcessor(final TraverserFactory<C> traverserFactory, final 
ProcessorFactory processorFactory);
-
-    public List<List<CFunction<C>>> getFunctions();
-
-    public interface Branching<C, S, E> extends NestedFunction<C, S, E> {
-    }
-
-    public interface Internal<C, S, E> extends NestedFunction<C, S, E> {
+public class IdentityTest {
 
+    public void g_injectX2X_identity(final Traversal<Long, Long, Long> 
traversal) {
+        final List<Long> list = traversal.toList();
+        assertEquals(1, list.size());
+        assertEquals(2L, list.get(0));
     }
 }

Reply via email to