This is an automated email from the ASF dual-hosted git repository.

okram pushed a commit to branch tp4
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git


The following commit(s) were added to refs/heads/tp4 by this push:
     new 5b2934a  per discussions with @dkuppitz, working towards making 
bytecode map, flatmap, branch, reduce, filter instructions ONLY. The TP4 
Machine supports a very tiny instruction set that can, at compile time, be 
strategized into processor specific functions (optimizations). This is crazy 
stuff, but if it works, it will be brilliant.
5b2934a is described below

commit 5b2934af091e5acdccfeaa8b3e602dc1456a1cd7
Author: Marko A. Rodriguez <okramma...@gmail.com>
AuthorDate: Sat Mar 23 05:37:26 2019 -0600

    per discussions with @dkuppitz, working towards making bytecode map, 
flatmap, branch, reduce, filter instructions ONLY. The TP4 Machine supports a 
very tiny instruction set that can, at compile time, be strategized into 
processor specific functions (optimizations). This is crazy stuff, but if it 
works, it will be brilliant.
---
 .../tinkerpop/language/gremlin/Traversal.java      | 12 ++--
 .../language/gremlin/TraversalSource.java          |  2 +-
 .../tinkerpop/machine/bytecode/Argument.java       |  6 +-
 .../tinkerpop/machine/bytecode/CoreCompiler.java   | 43 ++++---------
 .../tinkerpop/machine/function/BranchFunction.java |  5 +-
 .../machine/function/barrier/JoinBarrier.java      |  2 +-
 .../branch/{UnionBranch.java => BranchBranch.java} | 39 ++++++------
 .../machine/function/branch/IfBranch.java          | 72 ----------------------
 .../machine/function/filter/FilterFilter.java      | 12 ++--
 .../machine/function/filter/HasKeyFilter.java      |  4 +-
 .../machine/function/filter/HasKeyValueFilter.java |  2 +-
 .../machine/function/filter/IdentityFilter.java    | 41 ------------
 .../machine/function/filter/IsFilter.java          |  2 +-
 .../tinkerpop/machine/function/map/MapMap.java     | 12 ++--
 .../strategy/decoration/ExplainStrategy.java       |  2 +-
 .../strategy/optimization/IdentityStrategy.java    |  3 +-
 .../machine/bytecode/InstructionTest.java          | 10 +--
 .../machine/bytecode/SourceInstructionTest.java    |  8 +--
 .../tinkerpop/machine/processor/beam/BranchFn.java | 36 +++++++----
 .../machine/processor/beam/util/TopologyUtil.java  | 45 ++++++++------
 .../tinkerpop/machine/processor/beam/BeamTest.java |  5 +-
 .../machine/processor/pipes/BranchStep.java        | 30 +++++----
 .../tinkerpop/machine/processor/pipes/Pipes.java   |  2 +-
 .../machine/processor/pipes/PipesTest.java         |  7 ++-
 24 files changed, 152 insertions(+), 250 deletions(-)

diff --git 
a/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/Traversal.java
 
b/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/Traversal.java
index 15adece..783f35f 100644
--- 
a/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/Traversal.java
+++ 
b/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/Traversal.java
@@ -87,17 +87,17 @@ public class Traversal<C, S, E> implements Iterator<E> {
     }
 
     public <R> Traversal<C, S, R> choose(final Traversal<C, E, ?> predicate, 
final Traversal<C, S, R> trueTraversal, final Traversal<C, S, R> 
falseTraversal) {
-        this.bytecode.addInstruction(this.currentCoefficient, Symbols.IF, 
predicate.bytecode, trueTraversal.bytecode, falseTraversal.bytecode);
+        this.bytecode.addInstruction(this.currentCoefficient, Symbols.BRANCH, 
predicate.bytecode, trueTraversal.bytecode, Symbols.DEFAULT, 
falseTraversal.bytecode);
         return (Traversal) this;
     }
 
     public <R> Traversal<C, S, R> choose(final Traversal<C, E, ?> predicate, 
final Traversal<C, S, R> trueTraversal) {
-        this.bytecode.addInstruction(this.currentCoefficient, Symbols.IF, 
predicate.bytecode, trueTraversal.bytecode);
+        this.bytecode.addInstruction(this.currentCoefficient, Symbols.BRANCH, 
predicate.bytecode, trueTraversal.bytecode);
         return (Traversal) this;
     }
 
     public <R> Traversal<C, S, R> constant(final R constant) {
-        this.bytecode.addInstruction(this.currentCoefficient, 
Symbols.CONSTANT, constant);
+        this.bytecode.addInstruction(this.currentCoefficient, Symbols.MAP, 
constant);
         return (Traversal) this;
     }
 
@@ -168,7 +168,7 @@ public class Traversal<C, S, E> implements Iterator<E> {
     }
 
     public Traversal<C, S, E> identity() {
-        this.bytecode.addInstruction(this.currentCoefficient, 
Symbols.IDENTITY);
+        this.bytecode.addInstruction(this.currentCoefficient, Symbols.FILTER, 
Boolean.TRUE);
         return this;
     }
 
@@ -243,12 +243,12 @@ public class Traversal<C, S, E> implements Iterator<E> {
     // TODO: for some reason var args are not working...Java11
 
     public <R> Traversal<C, S, R> union(final Traversal<C, E, R> traversalA, 
final Traversal<C, E, R> traversalB) {
-        this.bytecode.addInstruction(this.currentCoefficient, Symbols.UNION, 
traversalA.bytecode, traversalB.bytecode);
+        this.bytecode.addInstruction(this.currentCoefficient, Symbols.BRANCH, 
Symbols.DEFAULT, traversalA.bytecode, Symbols.DEFAULT, traversalB.bytecode);
         return (Traversal) this;
     }
 
     public <R> Traversal<C, S, R> union(final Traversal<C, E, R> traversalA, 
final Traversal<C, E, R> traversalB, final Traversal<C, E, R> traversalC) {
-        this.bytecode.addInstruction(this.currentCoefficient, Symbols.UNION, 
traversalA.bytecode, traversalB.bytecode, traversalC.bytecode);
+        this.bytecode.addInstruction(this.currentCoefficient, Symbols.BRANCH, 
Symbols.DEFAULT, traversalA.bytecode, Symbols.DEFAULT, traversalB.bytecode, 
Symbols.DEFAULT, traversalC.bytecode);
         return (Traversal) this;
     }
 
diff --git 
a/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/TraversalSource.java
 
b/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/TraversalSource.java
index 594ab8b..f08de1f 100644
--- 
a/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/TraversalSource.java
+++ 
b/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/TraversalSource.java
@@ -75,7 +75,7 @@ public class TraversalSource<C> implements Cloneable {
     public <S> Traversal<C, S, S> inject(final S... objects) {
         final Bytecode<C> bytecode = this.bytecode.clone();
         final Coefficient<C> coefficient = this.coefficient.clone();
-        bytecode.addInstruction(coefficient, Symbols.INJECT, objects);
+        bytecode.addInstruction(coefficient, Symbols.INITIAL, objects);
         return new Traversal<>(coefficient, bytecode);
     }
 
diff --git 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/Argument.java
 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/Argument.java
index bbae38d..0859810 100644
--- 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/Argument.java
+++ 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/Argument.java
@@ -40,10 +40,14 @@ public final class Argument<E> implements Serializable {
     }
 
 
-    public final <C, S> E getArg(final Traverser<C, S> traverser) {
+    public final <C, S> E mapArg(final Traverser<C, S> traverser) {
         return this.isPrimitive ? this.arg : this.<C, 
S>getCompilation().mapTraverser(traverser).object();
     }
 
+    public final <C, S> boolean filterArg(final Traverser<C, S> traverser) {
+        return this.isPrimitive ? (Boolean) this.arg : this.<C, 
S>getCompilation().filterTraverser(traverser);
+    }
+
     public static <E> Argument<E> create(final Object arg) {
         return new Argument<>(arg);
     }
diff --git 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/CoreCompiler.java
 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/CoreCompiler.java
index e875ff1..6d6652f 100644
--- 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/CoreCompiler.java
+++ 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/CoreCompiler.java
@@ -22,17 +22,14 @@ import org.apache.tinkerpop.machine.coefficient.Coefficient;
 import org.apache.tinkerpop.machine.function.CFunction;
 import org.apache.tinkerpop.machine.function.barrier.JoinBarrier;
 import org.apache.tinkerpop.machine.function.barrier.StallBarrier;
-import org.apache.tinkerpop.machine.function.branch.IfBranch;
+import org.apache.tinkerpop.machine.function.branch.BranchBranch;
 import org.apache.tinkerpop.machine.function.branch.RepeatBranch;
-import org.apache.tinkerpop.machine.function.branch.UnionBranch;
 import org.apache.tinkerpop.machine.function.filter.FilterFilter;
 import org.apache.tinkerpop.machine.function.filter.HasKeyFilter;
 import org.apache.tinkerpop.machine.function.filter.HasKeyValueFilter;
-import org.apache.tinkerpop.machine.function.filter.IdentityFilter;
 import org.apache.tinkerpop.machine.function.filter.IsFilter;
 import org.apache.tinkerpop.machine.function.flatmap.UnfoldFlatMap;
 import org.apache.tinkerpop.machine.function.initial.InjectInitial;
-import org.apache.tinkerpop.machine.function.map.ConstantMap;
 import org.apache.tinkerpop.machine.function.map.IncrMap;
 import org.apache.tinkerpop.machine.function.map.LoopsMap;
 import org.apache.tinkerpop.machine.function.map.MapMap;
@@ -62,16 +59,14 @@ public final class CoreCompiler implements BytecodeCompiler 
{
 
     private static final Map<String, FunctionType> OP_TYPES = new HashMap<>() 
{{
         put(Symbols.BARRIER, FunctionType.BARRIER);
-        put(Symbols.IF, FunctionType.BRANCH);
-        put(Symbols.CONSTANT, FunctionType.MAP);
+        put(Symbols.BRANCH, FunctionType.BRANCH);
         put(Symbols.COUNT, FunctionType.REDUCE);
         put(Symbols.FILTER, FunctionType.FILTER);
         put(Symbols.GROUP_COUNT, FunctionType.REDUCE);
         put(Symbols.HAS_KEY, FunctionType.FILTER);
         put(Symbols.HAS_KEY_VALUE, FunctionType.FILTER);
-        put(Symbols.IDENTITY, FunctionType.FILTER);
         put(Symbols.INCR, FunctionType.MAP);
-        put(Symbols.INJECT, FunctionType.INITIAL);
+        put(Symbols.INITIAL, FunctionType.INITIAL);
         put(Symbols.IS, FunctionType.FILTER);
         put(Symbols.JOIN, FunctionType.BARRIER);
         put(Symbols.LOOPS, FunctionType.MAP);
@@ -92,26 +87,19 @@ public final class CoreCompiler implements BytecodeCompiler 
{
         switch (op) {
             case Symbols.BARRIER:
                 return new StallBarrier<>(coefficient, labels, 1000);
-            case Symbols.IF:
-                return new IfBranch<>(coefficient, labels,
-                        Compilation.compileOne(instruction.args()[0]),
-                        Compilation.compileOne(instruction.args()[1]),
-                        Compilation.compileOrNull(2, instruction.args()));
-            case Symbols.CONSTANT:
-                return new ConstantMap<>(coefficient, labels, 
instruction.args()[0]);
+            case Symbols.BRANCH:
+                return new BranchBranch<>(coefficient, labels, 
BranchBranch.makeBranches(instruction.args()));
             case Symbols.COUNT:
                 return new CountReduce<>(coefficient, labels);
             case Symbols.FILTER:
-                return new FilterFilter<>(coefficient, labels, 
Compilation.compileOne(instruction.args()[0]));
+                return new FilterFilter<>(coefficient, labels, 
Argument.create(instruction.args()[0]));
             case Symbols.GROUP_COUNT:
                 return new GroupCountReduce<>(coefficient, labels, 
Compilation.compileOrNull(0, instruction.args()));
             case Symbols.HAS_KEY:
                 return new HasKeyFilter<>(coefficient, labels, 
Pred.valueOf(instruction.args()[0]), Argument.create(instruction.args()[1]));
             case Symbols.HAS_KEY_VALUE:
                 return new HasKeyValueFilter<>(coefficient, labels, 
Argument.create(instruction.args()[0]), Argument.create(instruction.args()[1]));
-            case Symbols.IDENTITY:
-                return new IdentityFilter<>(coefficient, labels);
-            case Symbols.INJECT:
+            case Symbols.INITIAL:
                 return new InjectInitial<>(coefficient, labels, 
instruction.args());
             case Symbols.IS:
                 return new IsFilter<>(coefficient, labels, 
Pred.valueOf(instruction.args()[0]), Argument.create(instruction.args()[1]));
@@ -122,7 +110,7 @@ public final class CoreCompiler implements BytecodeCompiler 
{
             case Symbols.LOOPS:
                 return new LoopsMap<>(coefficient, labels);
             case Symbols.MAP:
-                return new MapMap<>(coefficient, labels, 
Compilation.compileOne(instruction.args()[0]));
+                return new MapMap<>(coefficient, labels, 
Argument.create(instruction.args()[0]));
             case Symbols.PATH:
                 return new PathMap<>(coefficient, labels, 
Compilation.compile(instruction.args()));
             case Symbols.REPEAT:
@@ -131,8 +119,6 @@ public final class CoreCompiler implements BytecodeCompiler 
{
                 return new SumReduce<>(coefficient, labels);
             case Symbols.UNFOLD:
                 return new UnfoldFlatMap<>(coefficient, labels);
-            case Symbols.UNION:
-                return new UnionBranch<>(coefficient, labels, 
Compilation.compile(instruction.args()));
             default:
                 return null;
         }
@@ -169,12 +155,12 @@ public final class CoreCompiler implements 
BytecodeCompiler {
         public static final String GTE = "gte";
         public static final String REGEX = "regex";
 
+        // BRANCH TOKENS
+        public static final String DEFAULT = "default";
+
         // INSTRUCTION OPS
         public static final String BARRIER = "barrier";
-        // [if,[bc],[bc],?[bc]]
-        public static final String IF = "if";
-        // [constant, o]
-        public static final String CONSTANT = "constant";
+        public static final String BRANCH = "branch";
         // [count]
         public static final String COUNT = "count";
         public static final String EXPLAIN = "explain";
@@ -185,12 +171,9 @@ public final class CoreCompiler implements 
BytecodeCompiler {
         // [hasKey, pred, o | [bc]]
         public static final String HAS_KEY = "hasKey";
         public static final String HAS_KEY_VALUE = "hasKeyValue";
-        // [identity]
-        public static final String IDENTITY = "identity";
         // [incr]
         public static final String INCR = "incr";
-        // [inject, o*]
-        public static final String INJECT = "inject";
+        public static final String INITIAL = "initial";
         // [is, pred, o | [bc]]
         public static final String IS = "is";
         public static final String JOIN = "join";
diff --git 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/BranchFunction.java
 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/BranchFunction.java
index d54ba8b..f3b460d 100644
--- 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/BranchFunction.java
+++ 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/BranchFunction.java
@@ -26,9 +26,8 @@ import java.util.Map;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public interface BranchFunction<C, S, E, M> extends CFunction<C> {
+public interface BranchFunction<C, S, E> extends CFunction<C> {
 
-    public Compilation<C, S, M> getBranchSelector();
+    public Map<Compilation<C, S, ?>, List<Compilation<C, S, E>>> getBranches();
 
-    public Map<M, List<Compilation<C, S, E>>> getBranches();
 }
diff --git 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/barrier/JoinBarrier.java
 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/barrier/JoinBarrier.java
index 56ec7d1..19a01fc 100644
--- 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/barrier/JoinBarrier.java
+++ 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/barrier/JoinBarrier.java
@@ -71,7 +71,7 @@ public class JoinBarrier<C, K, V> extends AbstractFunction<C> 
implements Barrier
     @Override
     public List<Map<K, V>> apply(final Traverser<C, Map<K, V>> traverser, 
final List<Map<K, V>> barrier) {
         
this.joinCompilation.flatMapTraverser(traverser).forEachRemaining(other -> {
-            final K key = this.joinKey.getArg(traverser);
+            final K key = this.joinKey.mapArg(traverser);
             if (traverser.object().get(key).equals(other.object().get(key))) {
                 final Map<K, V> join = new HashMap<>();
                 for (final Map.Entry<K, V> entry : 
traverser.object().entrySet()) {
diff --git 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/branch/UnionBranch.java
 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/branch/BranchBranch.java
similarity index 51%
rename from 
java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/branch/UnionBranch.java
rename to 
java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/branch/BranchBranch.java
index f667760..fd9c8a4 100644
--- 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/branch/UnionBranch.java
+++ 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/branch/BranchBranch.java
@@ -19,12 +19,12 @@
 package org.apache.tinkerpop.machine.function.branch;
 
 import org.apache.tinkerpop.machine.bytecode.Compilation;
+import org.apache.tinkerpop.machine.bytecode.CoreCompiler.Symbols;
 import org.apache.tinkerpop.machine.coefficient.Coefficient;
 import org.apache.tinkerpop.machine.function.AbstractFunction;
 import org.apache.tinkerpop.machine.function.BranchFunction;
-import org.apache.tinkerpop.machine.processor.ConstantProcessor;
-import org.apache.tinkerpop.machine.util.StringFactory;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -33,29 +33,32 @@ import java.util.Set;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class UnionBranch<C, S, E> extends AbstractFunction<C> implements 
BranchFunction<C, S, E, Boolean> {
+public final class BranchBranch<C, S, E> extends AbstractFunction<C> 
implements BranchFunction<C, S, E> {
 
-    private final Map<Boolean, List<Compilation<C, S, E>>> branches;
-    private final Compilation<C, S, Boolean> branchSelector = new 
Compilation<>(new ConstantProcessor<>(Boolean.TRUE));
+    private final Map<Compilation<C, S, ?>, List<Compilation<C, S, E>>> 
branches;
 
-    public UnionBranch(final Coefficient<C> coefficient, final Set<String> 
labels, final List<Compilation<C, S, E>> branches) {
+    public BranchBranch(final Coefficient<C> coefficient, final Set<String> 
labels, final Map<Compilation<C, S, ?>, List<Compilation<C, S, E>>> branches) {
         super(coefficient, labels);
-        this.branches = new HashMap<>();
-        this.branches.put(Boolean.TRUE, branches);
+        this.branches = branches;
     }
 
     @Override
-    public String toString() {
-        return StringFactory.makeFunctionString(this, 
this.branches.values().iterator().next()); // make a flat array
-    }
-
-    @Override
-    public Compilation<C, S, Boolean> getBranchSelector() {
-        return this.branchSelector;
+    public Map<Compilation<C, S, ?>, List<Compilation<C, S, E>>> getBranches() 
{
+        return this.branches;
     }
 
-    @Override
-    public Map<Boolean, List<Compilation<C, S, E>>> getBranches() {
-        return this.branches;
+    public static <C, S, E> Map<Compilation<C, S, ?>, List<Compilation<C, S, 
E>>> makeBranches(final Object... args) {
+        final Map<Compilation<C, S, ?>, List<Compilation<C, S, E>>> branches = 
new HashMap<>();
+        for (int i = 0; i < args.length; i = i + 2) {
+            final Compilation<C, S, ?> predicate = 
Symbols.DEFAULT.equals(args[i]) ? null : Compilation.compileOne(args[i]);
+            final Compilation<C, S, E> branch = Compilation.compileOne(args[i 
+ 1]);
+            List<Compilation<C, S, E>> list = branches.get(predicate);
+            if (null == list) {
+                list = new ArrayList<>();
+                branches.put(predicate, list);
+            }
+            list.add(branch);
+        }
+        return branches;
     }
 }
diff --git 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/branch/IfBranch.java
 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/branch/IfBranch.java
deleted file mode 100644
index 66612de..0000000
--- 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/branch/IfBranch.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.machine.function.branch;
-
-import org.apache.tinkerpop.machine.bytecode.Compilation;
-import org.apache.tinkerpop.machine.coefficient.Coefficient;
-import org.apache.tinkerpop.machine.function.AbstractFunction;
-import org.apache.tinkerpop.machine.function.BranchFunction;
-import org.apache.tinkerpop.machine.processor.HasNextProcessor;
-import org.apache.tinkerpop.machine.util.StringFactory;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class IfBranch<C, S, E> extends AbstractFunction<C> implements 
BranchFunction<C, S, E, Boolean> {
-
-    private final Compilation<C, S, Boolean> branchSelector;
-    private final Map<Boolean, List<Compilation<C, S, E>>> branches;
-    /////
-    private final Compilation<C, S, ?> predicate;
-
-
-    public IfBranch(final Coefficient<C> coefficient, final Set<String> labels,
-                    final Compilation<C, S, ?> predicate,
-                    final Compilation<C, S, E> trueBranch,
-                    final Compilation<C, S, E> falseBranch) {
-        super(coefficient, labels);
-        this.predicate = predicate;
-        this.branchSelector = new Compilation<>(new 
HasNextProcessor<>(predicate));
-        this.branches = new HashMap<>();
-        this.branches.put(Boolean.TRUE, Collections.singletonList(trueBranch));
-        if (null != falseBranch)
-            this.branches.put(Boolean.FALSE, 
Collections.singletonList(falseBranch));
-    }
-
-    @Override
-    public String toString() {
-        return StringFactory.makeFunctionString(this, this.predicate, 
this.branches);
-    }
-
-    @Override
-    public Compilation<C, S, Boolean> getBranchSelector() {
-        return this.branchSelector;
-    }
-
-    @Override
-    public Map<Boolean, List<Compilation<C, S, E>>> getBranches() {
-        return this.branches;
-    }
-}
diff --git 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/filter/FilterFilter.java
 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/filter/FilterFilter.java
index 0cc1502..79a8e78 100644
--- 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/filter/FilterFilter.java
+++ 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/filter/FilterFilter.java
@@ -18,7 +18,7 @@
  */
 package org.apache.tinkerpop.machine.function.filter;
 
-import org.apache.tinkerpop.machine.bytecode.Compilation;
+import org.apache.tinkerpop.machine.bytecode.Argument;
 import org.apache.tinkerpop.machine.coefficient.Coefficient;
 import org.apache.tinkerpop.machine.function.AbstractFunction;
 import org.apache.tinkerpop.machine.function.FilterFunction;
@@ -32,21 +32,21 @@ import java.util.Set;
  */
 public final class FilterFilter<C, S> extends AbstractFunction<C> implements 
FilterFunction<C, S> {
 
-    private final Compilation<C, S, ?> internalFilter;
+    private final Argument<S> argument;
 
-    public FilterFilter(final Coefficient<C> coefficient, final Set<String> 
labels, final Compilation<C, S, ?> internalFilter) {
+    public FilterFilter(final Coefficient<C> coefficient, final Set<String> 
labels, final Argument<S> argument) {
         super(coefficient, labels);
-        this.internalFilter = internalFilter;
+        this.argument = argument;
     }
 
     @Override
     public boolean test(final Traverser<C, S> traverser) {
-        return this.internalFilter.filterTraverser(traverser);
+        return this.argument.filterArg(traverser);
     }
 
     @Override
     public String toString() {
-        return StringFactory.makeFunctionString(this, this.internalFilter);
+        return StringFactory.makeFunctionString(this, this.argument);
     }
 
 }
diff --git 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/filter/HasKeyFilter.java
 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/filter/HasKeyFilter.java
index efe2a5d..59c90da 100644
--- 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/filter/HasKeyFilter.java
+++ 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/filter/HasKeyFilter.java
@@ -47,9 +47,9 @@ public final class HasKeyFilter<C, K, V> extends 
AbstractFunction<C> implements
     public boolean test(final Traverser<C, Map<K, V>> traverser) {
         final Map<K, V> object = traverser.object();
         if (Pred.eq == this.predicate)
-            return object.containsKey(this.key.getArg(traverser));
+            return object.containsKey(this.key.mapArg(traverser));
         else {
-            final K testKey = this.key.getArg(traverser);
+            final K testKey = this.key.mapArg(traverser);
             for (final K key : traverser.object().keySet()) {
                 if (this.predicate.test(key, testKey))
                     return true;
diff --git 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/filter/HasKeyValueFilter.java
 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/filter/HasKeyValueFilter.java
index ddf11da..1a16f44 100644
--- 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/filter/HasKeyValueFilter.java
+++ 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/filter/HasKeyValueFilter.java
@@ -45,7 +45,7 @@ public final class HasKeyValueFilter<C, K, V> extends 
AbstractFunction<C> implem
     @Override
     public boolean test(final Traverser<C, Map<K, V>> traverser) {
         final Map<K, V> object = traverser.object();
-        return 
this.value.getArg(traverser).equals(object.get(this.key.getArg(traverser)));
+        return 
this.value.mapArg(traverser).equals(object.get(this.key.mapArg(traverser)));
     }
 
     @Override
diff --git 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/filter/IdentityFilter.java
 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/filter/IdentityFilter.java
deleted file mode 100644
index f1c0e6c..0000000
--- 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/filter/IdentityFilter.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.machine.function.filter;
-
-import org.apache.tinkerpop.machine.coefficient.Coefficient;
-import org.apache.tinkerpop.machine.function.AbstractFunction;
-import org.apache.tinkerpop.machine.function.FilterFunction;
-import org.apache.tinkerpop.machine.traverser.Traverser;
-
-import java.util.Set;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class IdentityFilter<C, S> extends AbstractFunction<C> implements 
FilterFunction<C, S> {
-
-    public IdentityFilter(final Coefficient<C> coefficient, final Set<String> 
labels) {
-        super(coefficient, labels);
-    }
-
-    @Override
-    public boolean test(final Traverser<C, S> traverser) {
-        return true;
-    }
-}
diff --git 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/filter/IsFilter.java
 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/filter/IsFilter.java
index 7831c73..05afb21 100644
--- 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/filter/IsFilter.java
+++ 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/filter/IsFilter.java
@@ -44,7 +44,7 @@ public final class IsFilter<C, S> extends AbstractFunction<C> 
implements FilterF
 
     @Override
     public boolean test(final Traverser<C, S> traverser) {
-        return this.predicate.test(traverser.object(), 
this.argument.getArg(traverser));
+        return this.predicate.test(traverser.object(), 
this.argument.mapArg(traverser));
     }
 
     @Override
diff --git 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/map/MapMap.java
 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/map/MapMap.java
index a94aa06..8ee8713 100644
--- 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/map/MapMap.java
+++ 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/map/MapMap.java
@@ -18,7 +18,7 @@
  */
 package org.apache.tinkerpop.machine.function.map;
 
-import org.apache.tinkerpop.machine.bytecode.Compilation;
+import org.apache.tinkerpop.machine.bytecode.Argument;
 import org.apache.tinkerpop.machine.coefficient.Coefficient;
 import org.apache.tinkerpop.machine.function.AbstractFunction;
 import org.apache.tinkerpop.machine.function.MapFunction;
@@ -32,20 +32,20 @@ import java.util.Set;
  */
 public class MapMap<C, S, E> extends AbstractFunction<C> implements 
MapFunction<C, S, E> {
 
-    private final Compilation<C, S, E> internalMap;
+    private final Argument<E> argument;
 
-    public MapMap(final Coefficient<C> coefficient, final Set<String> labels, 
final Compilation<C, S, E> internalMap) {
+    public MapMap(final Coefficient<C> coefficient, final Set<String> labels, 
final Argument<E> argument) {
         super(coefficient, labels);
-        this.internalMap = internalMap;
+        this.argument = argument;
     }
 
     @Override
     public E apply(final Traverser<C, S> traverser) {
-        return this.internalMap.mapTraverser(traverser).object();
+        return this.argument.mapArg(traverser);
     }
 
     @Override
     public String toString() {
-        return StringFactory.makeFunctionString(this, this.internalMap);
+        return StringFactory.makeFunctionString(this, this.argument);
     }
 }
diff --git 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/strategy/decoration/ExplainStrategy.java
 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/strategy/decoration/ExplainStrategy.java
index 9550274..0b6b682 100644
--- 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/strategy/decoration/ExplainStrategy.java
+++ 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/strategy/decoration/ExplainStrategy.java
@@ -48,7 +48,7 @@ public final class ExplainStrategy extends 
AbstractStrategy<Strategy.DecorationS
             bytecode.getInstructions().clear();
             bytecode.addInstruction(
                     BytecodeUtil.getCoefficient(clone).get(),
-                    Symbols.INJECT,
+                    Symbols.INITIAL,
                     ExplainStrategy.explainBytecode(clone));
         }
     }
diff --git 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/strategy/optimization/IdentityStrategy.java
 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/strategy/optimization/IdentityStrategy.java
index f3764bf..6c1bd99 100644
--- 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/strategy/optimization/IdentityStrategy.java
+++ 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/strategy/optimization/IdentityStrategy.java
@@ -31,7 +31,8 @@ public final class IdentityStrategy extends 
AbstractStrategy<Strategy.Optimizati
     @Override
     public <C> void apply(final Bytecode<C> bytecode) {
         bytecode.getInstructions().removeIf(instruction ->
-                instruction.op().equals(CoreCompiler.Symbols.IDENTITY) &&
+                instruction.op().equals(CoreCompiler.Symbols.FILTER) &&
+                        Boolean.TRUE.equals(instruction.args()[0]) &&
                         instruction.labels().isEmpty() &&
                         instruction.coefficient().isUnity());
     }
diff --git 
a/java/machine/machine-core/src/test/java/org/apache/tinkerpop/machine/bytecode/InstructionTest.java
 
b/java/machine/machine-core/src/test/java/org/apache/tinkerpop/machine/bytecode/InstructionTest.java
index 63736e9..ed2b723 100644
--- 
a/java/machine/machine-core/src/test/java/org/apache/tinkerpop/machine/bytecode/InstructionTest.java
+++ 
b/java/machine/machine-core/src/test/java/org/apache/tinkerpop/machine/bytecode/InstructionTest.java
@@ -29,11 +29,11 @@ import static 
org.junit.jupiter.api.Assertions.assertNotEquals;
  */
 class InstructionTest {
 
-    private final Instruction<Long> a = new 
Instruction<>(LongCoefficient.create(10L), "atest", new Object[]{1, 2, 3});
-    private final Instruction<Long> b = new 
Instruction<>(LongCoefficient.create(10L), "atest", new Object[]{1, 2, 3});
-    private final Instruction<Long> c = new 
Instruction<>(LongCoefficient.create(10L), "atest", new Object[]{1, 2});
-    private final Instruction<Long> d = new 
Instruction<>(LongCoefficient.create(10L), "btest", new Object[]{1, 2, 3});
-    private final Instruction<Long> e = new 
Instruction<>(LongCoefficient.create(1L), "btest", new Object[]{1, 2, 3});
+    private final Instruction<Long> a = new 
Instruction<>(LongCoefficient.create(10L), "atest", 1, 2, 3);
+    private final Instruction<Long> b = new 
Instruction<>(LongCoefficient.create(10L), "atest", 1, 2, 3);
+    private final Instruction<Long> c = new 
Instruction<>(LongCoefficient.create(10L), "atest", 1, 2);
+    private final Instruction<Long> d = new 
Instruction<>(LongCoefficient.create(10L), "btest", 1, 2, 3);
+    private final Instruction<Long> e = new 
Instruction<>(LongCoefficient.create(1L), "btest", 1, 2, 3);
 
     @Test
     void testMethods() {
diff --git 
a/java/machine/machine-core/src/test/java/org/apache/tinkerpop/machine/bytecode/SourceInstructionTest.java
 
b/java/machine/machine-core/src/test/java/org/apache/tinkerpop/machine/bytecode/SourceInstructionTest.java
index 50b4c87..0fe267e 100644
--- 
a/java/machine/machine-core/src/test/java/org/apache/tinkerpop/machine/bytecode/SourceInstructionTest.java
+++ 
b/java/machine/machine-core/src/test/java/org/apache/tinkerpop/machine/bytecode/SourceInstructionTest.java
@@ -28,10 +28,10 @@ import static 
org.junit.jupiter.api.Assertions.assertNotEquals;
  */
 class SourceInstructionTest {
 
-    private final SourceInstruction a = new SourceInstruction("atest", new 
Object[]{1, 2, 3});
-    private final SourceInstruction b = new SourceInstruction("atest", new 
Object[]{1, 2, 3});
-    private final SourceInstruction c = new SourceInstruction("atest", new 
Object[]{1, 2});
-    private final SourceInstruction d = new SourceInstruction("btest", new 
Object[]{1, 2, 3});
+    private final SourceInstruction a = new SourceInstruction("atest", 1, 2, 
3);
+    private final SourceInstruction b = new SourceInstruction("atest", 1, 2, 
3);
+    private final SourceInstruction c = new SourceInstruction("atest", 1, 2);
+    private final SourceInstruction d = new SourceInstruction("btest", 1, 2, 
3);
 
     @Test
     void testMethods() {
diff --git 
a/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/BranchFn.java
 
b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/BranchFn.java
index 63ca3c7..57219e8 100644
--- 
a/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/BranchFn.java
+++ 
b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/BranchFn.java
@@ -23,31 +23,41 @@ import org.apache.tinkerpop.machine.bytecode.Compilation;
 import org.apache.tinkerpop.machine.function.BranchFunction;
 import org.apache.tinkerpop.machine.traverser.Traverser;
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class BranchFn<C, S, E, M> extends AbstractFn<C, S, S> {
+public class BranchFn<C, S, E> extends AbstractFn<C, S, S> {
 
-    private final Map<M, TupleTag<Traverser<C, S>>> branches;
-    private final Compilation<C, S, M> branchSelector;
+    private final Map<Compilation<C, S, ?>, List<TupleTag<Traverser<C, S>>>> 
branches;
+    private final List<TupleTag<Traverser<C, S>>> defaultBranches;
 
-    public BranchFn(final BranchFunction<C, S, E, M> branchFunction, final 
Map<M, TupleTag<Traverser<C, S>>> branches) {
+    public BranchFn(final BranchFunction<C, S, E> branchFunction, final 
Map<Compilation<C, S, ?>, List<TupleTag<Traverser<C, S>>>> branches) {
         super(branchFunction);
-        this.branches = branches;
-        this.branchSelector = branchFunction.getBranchSelector();
+        this.branches = new HashMap<>(branches);
+        this.defaultBranches = branches.getOrDefault(null, 
Collections.emptyList());
+        this.branches.remove(null);
     }
 
     @ProcessElement
     public void processElement(final @Element Traverser<C, S> traverser, final 
MultiOutputReceiver outputs) {
-        final Optional<Traverser<C, M>> selector = 
this.branchSelector.maybeTraverser(traverser);
-        if (selector.isPresent()) {
-            final TupleTag<Traverser<C, S>> outputTag = 
this.branches.get(selector.get().object());
-            if (null != outputTag)
-                outputs.get(outputTag).output(traverser.clone());
+        boolean found = false;
+        for (final Map.Entry<Compilation<C, S, ?>, List<TupleTag<Traverser<C, 
S>>>> entry : this.branches.entrySet()) {
+            if (entry.getKey().filterTraverser(traverser)) {
+                found = true;
+                for (final TupleTag<Traverser<C, S>> output : 
entry.getValue()) {
+                    outputs.get(output).output(traverser.clone());
+                }
+            }
+        }
+        if (!found) {
+            for (final TupleTag<Traverser<C, S>> output : 
this.defaultBranches) {
+                outputs.get(output).output(traverser.clone());
+            }
         }
-
     }
 }
\ No newline at end of file
diff --git 
a/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/util/TopologyUtil.java
 
b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/util/TopologyUtil.java
index 28a5c40..c74c705 100644
--- 
a/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/util/TopologyUtil.java
+++ 
b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/util/TopologyUtil.java
@@ -26,6 +26,15 @@ import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.tinkerpop.machine.bytecode.Compilation;
+import org.apache.tinkerpop.machine.function.BranchFunction;
+import org.apache.tinkerpop.machine.function.CFunction;
+import org.apache.tinkerpop.machine.function.FilterFunction;
+import org.apache.tinkerpop.machine.function.FlatMapFunction;
+import org.apache.tinkerpop.machine.function.InitialFunction;
+import org.apache.tinkerpop.machine.function.MapFunction;
+import org.apache.tinkerpop.machine.function.ReduceFunction;
+import org.apache.tinkerpop.machine.function.branch.RepeatBranch;
 import org.apache.tinkerpop.machine.processor.beam.Beam;
 import org.apache.tinkerpop.machine.processor.beam.BranchFn;
 import org.apache.tinkerpop.machine.processor.beam.FilterFn;
@@ -37,15 +46,6 @@ import 
org.apache.tinkerpop.machine.processor.beam.RepeatDeadEndFn;
 import org.apache.tinkerpop.machine.processor.beam.RepeatEndFn;
 import org.apache.tinkerpop.machine.processor.beam.RepeatStartFn;
 import 
org.apache.tinkerpop.machine.processor.beam.serialization.TraverserCoder;
-import org.apache.tinkerpop.machine.bytecode.Compilation;
-import org.apache.tinkerpop.machine.function.BranchFunction;
-import org.apache.tinkerpop.machine.function.CFunction;
-import org.apache.tinkerpop.machine.function.FilterFunction;
-import org.apache.tinkerpop.machine.function.FlatMapFunction;
-import org.apache.tinkerpop.machine.function.InitialFunction;
-import org.apache.tinkerpop.machine.function.MapFunction;
-import org.apache.tinkerpop.machine.function.ReduceFunction;
-import org.apache.tinkerpop.machine.function.branch.RepeatBranch;
 import org.apache.tinkerpop.machine.traverser.Traverser;
 import org.apache.tinkerpop.machine.traverser.TraverserFactory;
 
@@ -53,6 +53,7 @@ import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -107,20 +108,26 @@ public final class TopologyUtil {
             sink.setCoder(new TraverserCoder<>());
             sink = 
PCollectionList.of(repeatOutputs).apply(Flatten.pCollections());
         } else if (function instanceof BranchFunction) {
-            final BranchFunction<C, S, E, M> branchFunction = 
(BranchFunction<C, S, E, M>) function;
-            final Map<M, TupleTag<Traverser<C, S>>> selectors = new 
LinkedHashMap<>();
-            for (final Map.Entry<M, List<Compilation<C, S, E>>> branch : 
branchFunction.getBranches().entrySet()) {
-                selectors.put(branch.getKey(), new TupleTag<>());
+            final BranchFunction<C, S, E> branchFunction = (BranchFunction<C, 
S, E>) function;
+            final Map<Compilation<C, S, ?>, List<TupleTag<Traverser<C, S>>>> 
selectors = new LinkedHashMap<>();
+            for (final Map.Entry<Compilation<C, S, ?>, List<Compilation<C, S, 
E>>> branch : branchFunction.getBranches().entrySet()) {
+                final List<TupleTag<Traverser<C, S>>> tags = new ArrayList<>();
+                for (final Compilation<C, S, E> temp : branch.getValue()) {
+                    tags.add(new TupleTag<>());
+                }
+                selectors.put(branch.getKey(), tags);
             }
-            final BranchFn<C, S, E, M> fn = new BranchFn<>(branchFunction, 
selectors);
-            final List<TupleTag<Traverser<C, S>>> tags = new 
ArrayList<>(selectors.values());
+            final BranchFn<C, S, E> fn = new BranchFn<>(branchFunction, 
selectors);
+            final List<TupleTag<Traverser<C, S>>> tags = 
selectors.values().stream().flatMap(List::stream).collect(Collectors.toList());
             final PCollectionTuple outputs = 
source.apply(ParDo.of(fn).withOutputTags(tags.get(0), TupleTagList.of((List) 
tags.subList(1, tags.size()))));
             outputs.getAll().values().forEach(c -> c.setCoder(new 
TraverserCoder()));
             final List<PCollection<Traverser<C, E>>> branchSinks = new 
ArrayList<>();
-            for (final Map.Entry<M, List<Compilation<C, S, E>>> branch : 
branchFunction.getBranches().entrySet()) {
-                final PCollection<Traverser<C, S>> output = 
outputs.get(selectors.get(branch.getKey()));
-                for (final Compilation<C, S, E> compilation : 
branch.getValue()) {
-                    branchSinks.add(TopologyUtil.compile(output, compilation));
+            for (final Map.Entry<Compilation<C, S, ?>, List<Compilation<C, S, 
E>>> branches : branchFunction.getBranches().entrySet()) {
+                for (final TupleTag<Traverser<C, S>> tag : 
selectors.get(branches.getKey())) {
+                    final PCollection<Traverser<C, S>> output = 
outputs.get(tag);
+                    for (final Compilation<C, S, E> branch : 
branches.getValue()) {
+                        branchSinks.add(TopologyUtil.compile(output, branch));
+                    }
                 }
             }
             sink = 
PCollectionList.of(branchSinks).apply(Flatten.pCollections());
diff --git 
a/java/machine/processor/beam/src/test/java/org/apache/tinkerpop/machine/processor/beam/BeamTest.java
 
b/java/machine/processor/beam/src/test/java/org/apache/tinkerpop/machine/processor/beam/BeamTest.java
index c750e3e..c419966 100644
--- 
a/java/machine/processor/beam/src/test/java/org/apache/tinkerpop/machine/processor/beam/BeamTest.java
+++ 
b/java/machine/processor/beam/src/test/java/org/apache/tinkerpop/machine/processor/beam/BeamTest.java
@@ -99,12 +99,11 @@ public class BeamTest {
         System.out.println(traversal);
         System.out.println(traversal.toList());
         System.out.println("\n----------\n");
-        /*traversal = g.inject(1L, 2L, 
3L).repeat(__.<Long>incr().incr().incr()).until(is(10L));
-        
System.out.println(TraversalUtil.getBytecode(traversal).getSourceInstructions());
+        traversal = g.inject(7L).choose(__.is(7L), __.incr(), 
__.<Long>incr().incr());
         System.out.println(TraversalUtil.getBytecode(traversal));
         System.out.println(traversal);
         System.out.println(traversal.toList());
-        System.out.println("\n----------\n");*/
+        System.out.println("\n----------\n");
         traversal = g.inject(10L).choose(__.is(7L), __.incr(), 
__.<Long>incr().incr());
         System.out.println(TraversalUtil.getBytecode(traversal));
         System.out.println(traversal);
diff --git 
a/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/BranchStep.java
 
b/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/BranchStep.java
index 890efba..1a6b147 100644
--- 
a/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/BranchStep.java
+++ 
b/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/BranchStep.java
@@ -24,24 +24,25 @@ import org.apache.tinkerpop.machine.traverser.Traverser;
 import org.apache.tinkerpop.machine.util.EmptyIterator;
 import org.apache.tinkerpop.machine.util.MultiIterator;
 
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
 final class BranchStep<C, S, E, M> extends AbstractStep<C, S, E> {
 
-    private final Compilation<C, S, M> branchSelector;
-    private final Map<M, List<Compilation<C, S, E>>> branches;
+    private final Map<Compilation<C, S, ?>, List<Compilation<C, S, E>>> 
branches;
+    private final List<Compilation<C, S, E>> defaultBranches;
     private Iterator<Traverser<C, E>> nextTraversers = 
EmptyIterator.instance();
 
-    BranchStep(final Step<C, ?, S> previousStep, final BranchFunction<C, S, E, 
M> branchFunction) {
+    BranchStep(final Step<C, ?, S> previousStep, final BranchFunction<C, S, E> 
branchFunction) {
         super(previousStep, branchFunction);
-        this.branchSelector = branchFunction.getBranchSelector();
         this.branches = branchFunction.getBranches();
+        this.defaultBranches = this.branches.getOrDefault(null, 
Collections.emptyList());
+        this.branches.remove(null);
     }
 
     @Override
@@ -58,19 +59,22 @@ final class BranchStep<C, S, E, M> extends AbstractStep<C, 
S, E> {
 
     private final void stageOutput() {
         while (!this.nextTraversers.hasNext() && this.previousStep.hasNext()) {
+            boolean found = false;
+            this.nextTraversers = new MultiIterator<>();
             final Traverser<C, S> traverser = this.previousStep.next();
-            final Optional<Traverser<C, M>> token = 
this.branchSelector.maybeTraverser(traverser);
-            if (token.isPresent()) {
-                final List<Compilation<C, S, E>> matches = 
this.branches.get(token.get().object());
-                if (1 == matches.size())
-                    this.nextTraversers = 
matches.get(0).addTraverser(traverser);
-                else {
-                    this.nextTraversers = new MultiIterator<>();
-                    for (final Compilation<C, S, E> branch : matches) {
+            for (final Map.Entry<Compilation<C, S, ?>, List<Compilation<C, S, 
E>>> entry : this.branches.entrySet()) {
+                if (entry.getKey().filterTraverser(traverser)) {
+                    found = true;
+                    for (final Compilation<C, S, E> branch : entry.getValue()) 
{
                         ((MultiIterator<Traverser<C, E>>) 
this.nextTraversers).addIterator(branch.addTraverser(traverser));
                     }
                 }
             }
+            if (!found) {
+                for (final Compilation<C, S, E> defaultBranches : 
this.defaultBranches) {
+                    ((MultiIterator<Traverser<C, E>>) 
this.nextTraversers).addIterator(defaultBranches.addTraverser(traverser));
+                }
+            }
         }
     }
 
diff --git 
a/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/Pipes.java
 
b/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/Pipes.java
index 330a421..24f62c0 100644
--- 
a/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/Pipes.java
+++ 
b/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/Pipes.java
@@ -57,7 +57,7 @@ public final class Pipes<C, S, E> implements Processor<C, S, 
E> {
             if (function instanceof RepeatBranch)
                 nextStep = new RepeatStep(previousStep, (RepeatBranch<C, ?>) 
function);
             else if (function instanceof BranchFunction)
-                nextStep = new BranchStep(previousStep, (BranchFunction<C, ?, 
?, ?>) function);
+                nextStep = new BranchStep(previousStep, (BranchFunction<C, ?, 
?>) function);
             else if (function instanceof FilterFunction)
                 nextStep = new FilterStep(previousStep, (FilterFunction<C, ?>) 
function);
             else if (function instanceof FlatMapFunction)
diff --git 
a/java/machine/processor/pipes/src/test/java/org/apache/tinkerpop/machine/processor/pipes/PipesTest.java
 
b/java/machine/processor/pipes/src/test/java/org/apache/tinkerpop/machine/processor/pipes/PipesTest.java
index 546bb0c..624418e 100644
--- 
a/java/machine/processor/pipes/src/test/java/org/apache/tinkerpop/machine/processor/pipes/PipesTest.java
+++ 
b/java/machine/processor/pipes/src/test/java/org/apache/tinkerpop/machine/processor/pipes/PipesTest.java
@@ -49,7 +49,7 @@ public class PipesTest {
                 .withStructure(TinkerGraphStructure.class)
                 .withStrategy(IdentityStrategy.class);
 
-        Traversal<Long, ?, ?> traversal = 
g.V().identity().union(__.count(),__.count()).map(__.<Long,Long>count().identity()).explain();
+        Traversal<Long, ?, ?> traversal = g.V().identity().union(__.count(), 
__.count()).map(__.<Long, Long>count().identity()).explain();
         System.out.println(TraversalUtil.getBytecode(traversal));
         System.out.println(traversal);
         System.out.println(TraversalUtil.getBytecode(traversal));
@@ -104,6 +104,11 @@ public class PipesTest {
         System.out.println(traversal);
         System.out.println(traversal.toList());
         System.out.println("\n----------\n");
+        traversal = g.inject(8L).choose(__.is(7L), __.incr(), 
__.<Long>incr().incr());
+        System.out.println(TraversalUtil.getBytecode(traversal));
+        System.out.println(traversal);
+        System.out.println(traversal.toList());
+        System.out.println("\n----------\n");
         traversal = g.inject(7L, 7L, 7L, 2L).incr().barrier();
         System.out.println(TraversalUtil.getBytecode(traversal));
         System.out.println(traversal);

Reply via email to