This is an automated email from the ASF dual-hosted git repository.

okram pushed a commit to branch tp4
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit 49c7d208b85955b2f4906b21007da25b338f0b6c
Author: Marko A. Rodriguez <okramma...@gmail.com>
AuthorDate: Mon Mar 11 05:42:27 2019 -0600

    have the first stub of Apache Beam working in TP4. I was in the most 
nightmarish hell for the longest time --- I didn't have the machine pom.xml 
have beam as a submodule so changes I did to core did nothing. Had this 
serialization error and I spent a good 2 hours to ultimately realize that my 
code changes werent taking effect. So bad.
---
 .../org/apache/tinkerpop/language/Symbols.java     |   1 +
 .../org/apache/tinkerpop/language/Traversal.java   |   5 +
 .../apache/tinkerpop/language/TraversalSource.java |   2 +
 .../tinkerpop/machine/bytecode/Bytecode.java       |  36 ++++++-
 .../tinkerpop/machine/bytecode/BytecodeUtil.java   |   3 +
 .../SourceInstruction.java}                        |  23 +++--
 .../machine/coefficients/Coefficient.java          |   4 +-
 .../machine/coefficients/LongCoefficient.java      |   5 +-
 .../machine/functions/AbstractFunction.java        |  12 +--
 .../tinkerpop/machine/functions/CFunction.java     |   3 +-
 .../machine/functions/InitialFunction.java         |   4 +-
 .../tinkerpop/machine/functions/MapFunction.java   |   3 +-
 .../{MapFunction.java => ReduceFunction.java}      |   6 +-
 .../machine/functions/filter/IdentityFilter.java   |   3 +-
 .../machine/functions/filter/IsFilter.java         |   4 +-
 .../machine/functions/initial/InjectInitial.java   |   5 +-
 .../tinkerpop/machine/functions/map/IncrMap.java   |   5 +-
 .../tinkerpop/machine/functions/map/MapMap.java    |   5 +-
 .../tinkerpop/machine/functions/map/PathMap.java   |   4 +-
 .../{CFunction.java => reduce/BasicReducer.java}   |  23 +++--
 .../{map/IncrMap.java => reduce/CountReduce.java}  |  17 ++--
 .../{CFunction.java => reduce/Reducer.java}        |  13 +--
 .../{Traverser.java => CompleteTraverser.java}     |  28 +++---
 .../CompleteTraverserFactory.java}                 |  15 ++-
 .../apache/tinkerpop/machine/traversers/Path.java  |   3 +-
 .../tinkerpop/machine/traversers/Traverser.java    |  67 +++++++------
 .../TraverserFactory.java}                         |  11 +--
 .../tinkerpop/machine/traversers/TraverserSet.java |   6 +-
 java/machine/beam/pom.xml                          | 103 ++++++++++++++++++++
 .../org/apache/tinkerpop/machine/beam/Beam.java    | 106 +++++++++++++++++++++
 .../tinkerpop/machine/beam/BeamProcessor.java}     |  14 ++-
 .../apache/tinkerpop/machine/beam/FilterFn.java}   |  25 +++--
 .../org/apache/tinkerpop/machine/beam/MapFn.java}  |  18 ++--
 .../apache/tinkerpop/machine/beam/OutputStep.java} |  18 +++-
 .../tinkerpop/machine/beam/TraverserCoder.java     |  65 +++++++++++++
 .../apache/tinkerpop/machine/beam/BeamTest.java}   |  15 +--
 .../tinkerpop/machine/pipes/AbstractStep.java      |   2 +-
 .../apache/tinkerpop/machine/pipes/EmptyStep.java  |   4 +-
 .../apache/tinkerpop/machine/pipes/FilterStep.java |  37 ++++++-
 .../tinkerpop/machine/pipes/InitialStep.java       |   9 +-
 .../apache/tinkerpop/machine/pipes/MapStep.java    |   5 +-
 .../org/apache/tinkerpop/machine/pipes/Pipes.java  |  14 ++-
 .../apache/tinkerpop/machine/pipes/ReduceStep.java |  63 ++++++++++++
 .../apache/tinkerpop/machine/pipes/PipesTest.java  |   7 +-
 java/machine/pom.xml                               |   1 +
 java/pom.xml                                       |   3 +-
 46 files changed, 642 insertions(+), 183 deletions(-)

diff --git a/java/core/src/main/java/org/apache/tinkerpop/language/Symbols.java 
b/java/core/src/main/java/org/apache/tinkerpop/language/Symbols.java
index 681e26e..b302b88 100644
--- a/java/core/src/main/java/org/apache/tinkerpop/language/Symbols.java
+++ b/java/core/src/main/java/org/apache/tinkerpop/language/Symbols.java
@@ -25,6 +25,7 @@ public final class Symbols {
 
     public static final String AS = "as";
     public static final String C = "c";
+    public static final String COUNT = "count";
     public static final String IDENTITY = "identity";
     public static final String IS = "is";
     public static final String INCR = "incr";
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/language/Traversal.java 
b/java/core/src/main/java/org/apache/tinkerpop/language/Traversal.java
index 7acdd75..f8f446c 100644
--- a/java/core/src/main/java/org/apache/tinkerpop/language/Traversal.java
+++ b/java/core/src/main/java/org/apache/tinkerpop/language/Traversal.java
@@ -62,6 +62,11 @@ public class Traversal<C, S, E> implements Iterator<E> {
         return this;
     }
 
+    public Traversal<C, S, Long> count() {
+        this.bytecode.addInstruction(this.currentCoefficient, Symbols.COUNT);
+        return (Traversal) this;
+    }
+
     public Traversal<C, S, E> identity() {
         this.bytecode.addInstruction(this.currentCoefficient, 
Symbols.IDENTITY);
         return this;
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/language/TraversalSource.java 
b/java/core/src/main/java/org/apache/tinkerpop/language/TraversalSource.java
index be6df43..baf4069 100644
--- a/java/core/src/main/java/org/apache/tinkerpop/language/TraversalSource.java
+++ b/java/core/src/main/java/org/apache/tinkerpop/language/TraversalSource.java
@@ -18,6 +18,7 @@
  */
 package org.apache.tinkerpop.language;
 
+import org.apache.tinkerpop.machine.bytecode.Bytecode;
 import org.apache.tinkerpop.machine.coefficients.Coefficient;
 import org.apache.tinkerpop.machine.coefficients.LongCoefficient;
 import org.apache.tinkerpop.machine.compiler.Strategy;
@@ -31,6 +32,7 @@ import java.util.List;
  */
 public class TraversalSource<C> {
 
+    private Bytecode<C> bytecode = new Bytecode<>();
     private Coefficient<C> coefficient;
     private ProcessorFactory factory;
     private List<Strategy> strategies = new ArrayList<>();
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/Bytecode.java 
b/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/Bytecode.java
index 6b53da5..3310070 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/Bytecode.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/Bytecode.java
@@ -19,6 +19,9 @@
 package org.apache.tinkerpop.machine.bytecode;
 
 import org.apache.tinkerpop.machine.coefficients.Coefficient;
+import org.apache.tinkerpop.machine.compiler.Strategy;
+import org.apache.tinkerpop.machine.traversers.CompleteTraverserFactory;
+import org.apache.tinkerpop.machine.traversers.TraverserFactory;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -26,14 +29,22 @@ import java.util.List;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class Bytecode<C> {
+public class Bytecode<C> implements Cloneable {
 
-    public List<Instruction<C>> instructions;
+    public List<Strategy> strategies = new ArrayList<>();
+    public List<Instruction<C>> instructions = new ArrayList<>();
 
-    public Bytecode() {
-        this.instructions = new ArrayList<>();
+
+    public void addStrategy(final Strategy strategy) {
+        this.strategies.add(strategy);
+    }
+
+    public List<Strategy> getStrategies() {
+        return this.strategies;
     }
 
+    ///
+
     public void addInstruction(final Coefficient<C> coefficient, final String 
op, final Object... args) {
         this.instructions.add(new Instruction<>(coefficient.clone(), op, 
args));
         coefficient.unity();
@@ -51,8 +62,25 @@ public class Bytecode<C> {
         return this.instructions.get(this.instructions.size() - 1);
     }
 
+    // this should be part of processor!
+    public <S> TraverserFactory<C, S> getTraverserFactory() {
+        return new CompleteTraverserFactory<>();
+    }
+
     @Override
     public String toString() {
         return this.instructions.toString();
     }
+
+    @Override
+    public Bytecode<C> clone() {
+        try {
+            final Bytecode<C> clone = (Bytecode<C>) super.clone();
+            clone.strategies = new ArrayList<>(this.strategies);
+            clone.instructions = new ArrayList<>(this.instructions);
+            return clone;
+        } catch (final CloneNotSupportedException e) {
+            throw new RuntimeException(e.getMessage(), e);
+        }
+    }
 }
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java
index 8f560fb..2f29bba 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java
@@ -27,6 +27,7 @@ import 
org.apache.tinkerpop.machine.functions.initial.InjectInitial;
 import org.apache.tinkerpop.machine.functions.map.IncrMap;
 import org.apache.tinkerpop.machine.functions.map.MapMap;
 import org.apache.tinkerpop.machine.functions.map.PathMap;
+import org.apache.tinkerpop.machine.functions.reduce.CountReduce;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -60,6 +61,8 @@ public final class BytecodeUtil {
         final Coefficient<C> coefficient = instruction.coefficient();
         final Set<String> labels = instruction.labels();
         switch (op) {
+            case Symbols.COUNT:
+                return new CountReduce<>(coefficient, labels);
             case Symbols.IDENTITY:
                 return new IdentityFilter<>(coefficient, labels);
             case Symbols.INJECT:
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/CFunction.java 
b/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/SourceInstruction.java
similarity index 69%
copy from 
java/core/src/main/java/org/apache/tinkerpop/machine/functions/CFunction.java
copy to 
java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/SourceInstruction.java
index 144b77f..48fc0f0 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/CFunction.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/SourceInstruction.java
@@ -16,19 +16,26 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.tinkerpop.machine.functions;
-
-import org.apache.tinkerpop.machine.coefficients.Coefficient;
-
-import java.util.Set;
+package org.apache.tinkerpop.machine.bytecode;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public interface CFunction<C> {
+public final class SourceInstruction {
+
+    private final String op;
+    private final Object[] args;
 
-    public Coefficient<C> coefficient();
+    public SourceInstruction(final String op, final Object[] args) {
+        this.op = op;
+        this.args = args;
+    }
 
-    public Set<String> labels();
+    public String op() {
+        return this.op;
+    }
 
+    public Object[] args() {
+        return this.args;
+    }
 }
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/coefficients/Coefficient.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/coefficients/Coefficient.java
index bfad9d3..83c2ef4 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/coefficients/Coefficient.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/coefficients/Coefficient.java
@@ -18,10 +18,12 @@
  */
 package org.apache.tinkerpop.machine.coefficients;
 
+import java.io.Serializable;
+
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public interface Coefficient<C> extends Cloneable {
+public interface Coefficient<C> extends Cloneable, Serializable {
 
     public Coefficient<C> sum(final C other);
 
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/coefficients/LongCoefficient.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/coefficients/LongCoefficient.java
index 8762553..f322383 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/coefficients/LongCoefficient.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/coefficients/LongCoefficient.java
@@ -23,12 +23,15 @@ package org.apache.tinkerpop.machine.coefficients;
  */
 public class LongCoefficient implements Coefficient<Long> {
 
-    private Long value;
+    protected Long value;
 
     private LongCoefficient(final Long value) {
         this.value = value;
     }
 
+    public LongCoefficient() {
+    }
+
     @Override
     public Coefficient<Long> sum(final Long other) {
         this.value = this.value + other;
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/AbstractFunction.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/AbstractFunction.java
index 5323696..d9d149b 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/AbstractFunction.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/AbstractFunction.java
@@ -19,7 +19,6 @@
 package org.apache.tinkerpop.machine.functions;
 
 import org.apache.tinkerpop.machine.coefficients.Coefficient;
-import org.apache.tinkerpop.machine.traversers.Traverser;
 import org.apache.tinkerpop.util.StringFactory;
 
 import java.util.Set;
@@ -29,7 +28,7 @@ import java.util.Set;
  */
 public abstract class AbstractFunction<C, S, E> implements CFunction<C> {
 
-    protected final Coefficient<C> coefficient;
+    private Coefficient<C> coefficient;
     private Set<String> labels;
 
     public AbstractFunction(final Coefficient<C> coefficient, final 
Set<String> labels) {
@@ -47,15 +46,6 @@ public abstract class AbstractFunction<C, S, E> implements 
CFunction<C> {
         return this.labels;
     }
 
-    protected Traverser<C, E> postProcess(final Traverser<C, E> traverser) {
-        traverser.coefficient().multiply(this.coefficient.value());
-        for (final String label : this.labels) {
-            traverser.addLabel(label);
-        }
-        return traverser;
-    }
-
-
     @Override
     public String toString() {
         return StringFactory.makeFunctionString(this);
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/CFunction.java 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/CFunction.java
index 144b77f..568c713 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/CFunction.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/CFunction.java
@@ -20,12 +20,13 @@ package org.apache.tinkerpop.machine.functions;
 
 import org.apache.tinkerpop.machine.coefficients.Coefficient;
 
+import java.io.Serializable;
 import java.util.Set;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public interface CFunction<C> {
+public interface CFunction<C> extends Serializable {
 
     public Coefficient<C> coefficient();
 
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/InitialFunction.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/InitialFunction.java
index d44c47b..4174e16 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/InitialFunction.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/InitialFunction.java
@@ -18,13 +18,11 @@
  */
 package org.apache.tinkerpop.machine.functions;
 
-import org.apache.tinkerpop.machine.traversers.Traverser;
-
 import java.util.Iterator;
 import java.util.function.Supplier;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public interface InitialFunction<C, S> extends Supplier<Iterator<Traverser<C, 
S>>>, CFunction<C> {
+public interface InitialFunction<C, S> extends Supplier<Iterator<S>>, 
CFunction<C> {
 }
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/MapFunction.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/MapFunction.java
index f5b5302..d7cb898 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/MapFunction.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/MapFunction.java
@@ -20,11 +20,12 @@ package org.apache.tinkerpop.machine.functions;
 
 import org.apache.tinkerpop.machine.traversers.Traverser;
 
+import java.io.Serializable;
 import java.util.function.Function;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public interface MapFunction<C, S, E> extends Function<Traverser<C, S>, 
Traverser<C, E>>, CFunction<C> {
+public interface MapFunction<C, S, E> extends Function<Traverser<C, S>, E>, 
CFunction<C> {
 
 }
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/MapFunction.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/ReduceFunction.java
similarity index 85%
copy from 
java/core/src/main/java/org/apache/tinkerpop/machine/functions/MapFunction.java
copy to 
java/core/src/main/java/org/apache/tinkerpop/machine/functions/ReduceFunction.java
index f5b5302..c2ceb0d 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/MapFunction.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/ReduceFunction.java
@@ -20,11 +20,13 @@ package org.apache.tinkerpop.machine.functions;
 
 import org.apache.tinkerpop.machine.traversers.Traverser;
 
-import java.util.function.Function;
+import java.util.function.BiFunction;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public interface MapFunction<C, S, E> extends Function<Traverser<C, S>, 
Traverser<C, E>>, CFunction<C> {
+public interface ReduceFunction<C, S, E> extends BiFunction<Traverser<C, S>, 
E, E>, CFunction<C> {
 
+    public E getInitialValue();
 }
+
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/IdentityFilter.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/IdentityFilter.java
index 544f674..e630242 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/IdentityFilter.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/IdentityFilter.java
@@ -28,7 +28,7 @@ import java.util.Set;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class IdentityFilter<C, S> extends AbstractFunction<C,S,S> implements 
FilterFunction<C, S> {
+public class IdentityFilter<C, S> extends AbstractFunction<C, S, S> implements 
FilterFunction<C, S> {
 
     public IdentityFilter(final Coefficient<C> coefficient, final Set<String> 
labels) {
         super(coefficient, labels);
@@ -36,7 +36,6 @@ public class IdentityFilter<C, S> extends 
AbstractFunction<C,S,S> implements Fil
 
     @Override
     public boolean test(final Traverser<C, S> traverser) {
-        super.postProcess(traverser);
         return true;
     }
 }
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/IsFilter.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/IsFilter.java
index c0f2a98..cbdd8ed 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/IsFilter.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/IsFilter.java
@@ -21,6 +21,7 @@ package org.apache.tinkerpop.machine.functions.filter;
 import org.apache.tinkerpop.machine.coefficients.Coefficient;
 import org.apache.tinkerpop.machine.functions.AbstractFunction;
 import org.apache.tinkerpop.machine.functions.FilterFunction;
+import org.apache.tinkerpop.machine.traversers.CompleteTraverser;
 import org.apache.tinkerpop.machine.traversers.Traverser;
 import org.apache.tinkerpop.util.StringFactory;
 
@@ -31,7 +32,7 @@ import java.util.Set;
  */
 public class IsFilter<C, S> extends AbstractFunction<C, S, S> implements 
FilterFunction<C, S> {
 
-    private final S object;
+    private S object;
 
     public IsFilter(final Coefficient<C> coefficient, final Set<String> 
labels, final S object) {
         super(coefficient, labels);
@@ -40,7 +41,6 @@ public class IsFilter<C, S> extends AbstractFunction<C, S, S> 
implements FilterF
 
     @Override
     public boolean test(final Traverser<C, S> traverser) {
-        super.postProcess(traverser);
         return traverser.object().equals(this.object);
     }
 
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/initial/InjectInitial.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/initial/InjectInitial.java
index de93cc5..4c15206 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/initial/InjectInitial.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/initial/InjectInitial.java
@@ -21,7 +21,6 @@ package org.apache.tinkerpop.machine.functions.initial;
 import org.apache.tinkerpop.machine.coefficients.Coefficient;
 import org.apache.tinkerpop.machine.functions.AbstractFunction;
 import org.apache.tinkerpop.machine.functions.InitialFunction;
-import org.apache.tinkerpop.machine.traversers.Traverser;
 import org.apache.tinkerpop.util.StringFactory;
 
 import java.util.Iterator;
@@ -41,8 +40,8 @@ public class InjectInitial<C, S> extends AbstractFunction<C, 
S, S> implements In
     }
 
     @Override
-    public Iterator<Traverser<C, S>> get() {
-        return Stream.of(this.objects).map(object -> new 
Traverser<>(this.coefficient.clone(), object)).iterator();
+    public Iterator<S> get() {
+        return Stream.of(this.objects).iterator();
     }
 
     @Override
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/IncrMap.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/IncrMap.java
index 1e5964f..e1eff44 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/IncrMap.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/IncrMap.java
@@ -35,7 +35,8 @@ public class IncrMap<C> extends AbstractFunction<C, Long, 
Long> implements MapFu
     }
 
     @Override
-    public Traverser<C, Long> apply(final Traverser<C, Long> traverser) {
-        return postProcess(traverser.split(traverser.object() + 1));
+    public Long apply(final Traverser<C, Long> traverser) {
+        return traverser.object() + 1L;
     }
+
 }
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/MapMap.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/MapMap.java
index f3681b6..8237fbf 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/MapMap.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/MapMap.java
@@ -24,6 +24,7 @@ import org.apache.tinkerpop.machine.functions.CFunction;
 import org.apache.tinkerpop.machine.functions.MapFunction;
 import org.apache.tinkerpop.machine.functions.NestedFunction;
 import org.apache.tinkerpop.machine.processor.Processor;
+import org.apache.tinkerpop.machine.traversers.CompleteTraverser;
 import org.apache.tinkerpop.machine.traversers.Traverser;
 import org.apache.tinkerpop.util.StringFactory;
 
@@ -44,10 +45,10 @@ public class MapMap<C, S, E> extends AbstractFunction<C, S, 
E> implements MapFun
     }
 
     @Override
-    public Traverser<C, E> apply(final Traverser<C, S> traverser) {
+    public E apply(final Traverser<C, S> traverser) {
         this.processor.reset();
         this.processor.addStart(traverser);
-        return 
super.postProcess(traverser.split(this.processor.next().object()));
+        return this.processor.next().object();
     }
 
     public void setProcessor(final Processor<C, S, E> processor) {
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/PathMap.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/PathMap.java
index 051d647..4c955d2 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/PathMap.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/PathMap.java
@@ -36,7 +36,7 @@ public class PathMap<C, S> extends AbstractFunction<C, S, 
Path> implements MapFu
     }
 
     @Override
-    public Traverser<C, Path> apply(final Traverser<C, S> traverser) {
-        return super.postProcess(traverser.split(traverser.path()));
+    public Path apply(final Traverser<C, S> traverser) {
+        return traverser.path();
     }
 }
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/CFunction.java 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/BasicReducer.java
similarity index 71%
copy from 
java/core/src/main/java/org/apache/tinkerpop/machine/functions/CFunction.java
copy to 
java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/BasicReducer.java
index 144b77f..40c00c1 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/CFunction.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/BasicReducer.java
@@ -16,19 +16,26 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.tinkerpop.machine.functions;
-
-import org.apache.tinkerpop.machine.coefficients.Coefficient;
-
-import java.util.Set;
+package org.apache.tinkerpop.machine.functions.reduce;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public interface CFunction<C> {
+public class BasicReducer<S> implements Reducer<S> {
+
+    private S value;
+
+    public BasicReducer(final S initialValue) {
+        this.value = initialValue;
+    }
+
+    public S get() {
+        return this.value;
+    }
 
-    public Coefficient<C> coefficient();
+    public void update(final S newValue) {
+        this.value = newValue;
+    }
 
-    public Set<String> labels();
 
 }
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/IncrMap.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/CountReduce.java
similarity index 68%
copy from 
java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/IncrMap.java
copy to 
java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/CountReduce.java
index 1e5964f..8c674c3 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/IncrMap.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/CountReduce.java
@@ -16,11 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.tinkerpop.machine.functions.map;
+package org.apache.tinkerpop.machine.functions.reduce;
 
 import org.apache.tinkerpop.machine.coefficients.Coefficient;
 import org.apache.tinkerpop.machine.functions.AbstractFunction;
-import org.apache.tinkerpop.machine.functions.MapFunction;
+import org.apache.tinkerpop.machine.functions.ReduceFunction;
 import org.apache.tinkerpop.machine.traversers.Traverser;
 
 import java.util.Set;
@@ -28,14 +28,19 @@ import java.util.Set;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class IncrMap<C> extends AbstractFunction<C, Long, Long> implements 
MapFunction<C, Long, Long> {
+public class CountReduce<C, S> extends AbstractFunction<C, S, Long> implements 
ReduceFunction<C, S, Long> {
 
-    public IncrMap(final Coefficient<C> coefficient, final Set<String> labels) 
{
+    public CountReduce(final Coefficient<C> coefficient, final Set<String> 
labels) {
         super(coefficient, labels);
     }
 
     @Override
-    public Traverser<C, Long> apply(final Traverser<C, Long> traverser) {
-        return postProcess(traverser.split(traverser.object() + 1));
+    public Long apply(final Traverser<C, S> traverser, final Long 
currentValue) {
+        return currentValue + traverser.coefficient().count();
+    }
+
+    @Override
+    public Long getInitialValue() {
+        return 0L;
     }
 }
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/CFunction.java 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/Reducer.java
similarity index 78%
copy from 
java/core/src/main/java/org/apache/tinkerpop/machine/functions/CFunction.java
copy to 
java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/Reducer.java
index 144b77f..8434ca7 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/CFunction.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/Reducer.java
@@ -16,19 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.tinkerpop.machine.functions;
-
-import org.apache.tinkerpop.machine.coefficients.Coefficient;
-
-import java.util.Set;
+package org.apache.tinkerpop.machine.functions.reduce;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public interface CFunction<C> {
-
-    public Coefficient<C> coefficient();
+public interface Reducer<S> {
 
-    public Set<String> labels();
+    public S get();
 
+    public void update(final S newValue);
 }
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/Traverser.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/CompleteTraverser.java
similarity index 70%
copy from 
java/core/src/main/java/org/apache/tinkerpop/machine/traversers/Traverser.java
copy to 
java/core/src/main/java/org/apache/tinkerpop/machine/traversers/CompleteTraverser.java
index 4d2e92a..81d17fc 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/Traverser.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/CompleteTraverser.java
@@ -20,19 +20,23 @@ package org.apache.tinkerpop.machine.traversers;
 
 import org.apache.tinkerpop.machine.coefficients.Coefficient;
 
+import java.io.Serializable;
 import java.util.Collections;
-import java.util.HashSet;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class Traverser<C, S> {
+public class CompleteTraverser<C, S> implements Traverser<C, S> {
 
-    private final Coefficient<C> coefficient;
-    private final S object;
+    private Coefficient<C> coefficient;
+    private S object;
     private Path path = new Path();
 
-    public Traverser(final Coefficient<C> coefficient, final S object) {
+    public CompleteTraverser() {
+
+    }
+
+    public CompleteTraverser(final Coefficient<C> coefficient, final S object) 
{
         this.coefficient = coefficient;
         this.object = object;
     }
@@ -53,20 +57,22 @@ public class Traverser<C, S> {
         this.path.addLabels(Collections.singleton(label));
     }
 
-    public <B> Traverser<C, B> split(final B object) {
-        final Traverser<C, B> traverser = new 
Traverser<>(this.coefficient.clone(), object);
-        traverser.path = new Path(this.path);
-        traverser.path.add(new HashSet<>(), object);
-        return traverser;
+    @Override
+    public <E> Traverser<C, E> split(final Coefficient<C> eCoefficient, final 
E eObject) {
+        final CompleteTraverser<C, E> clone = new 
CompleteTraverser<>(eCoefficient, eObject);
+        clone.path = new Path(this.path);
+        return clone;
     }
 
     @Override
     public boolean equals(final Object other) {
-        return other instanceof Traverser && ((Traverser<C, S>) 
other).object.equals(this.object);
+        return other instanceof CompleteTraverser && ((CompleteTraverser<C, 
S>) other).object.equals(this.object);
     }
 
     @Override
     public String toString() {
         return this.object.toString();
     }
+
+
 }
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/CFunction.java 
b/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/CompleteTraverserFactory.java
similarity index 75%
copy from 
java/core/src/main/java/org/apache/tinkerpop/machine/functions/CFunction.java
copy to 
java/core/src/main/java/org/apache/tinkerpop/machine/traversers/CompleteTraverserFactory.java
index 144b77f..7d9356f 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/CFunction.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/CompleteTraverserFactory.java
@@ -16,19 +16,16 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.tinkerpop.machine.functions;
+package org.apache.tinkerpop.machine.traversers;
 
 import org.apache.tinkerpop.machine.coefficients.Coefficient;
 
-import java.util.Set;
-
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public interface CFunction<C> {
-
-    public Coefficient<C> coefficient();
-
-    public Set<String> labels();
-
+public class CompleteTraverserFactory<C, S> implements TraverserFactory<C, S> {
+    @Override
+    public Traverser<C, S> create(final Coefficient<C> coefficient, final S 
object) {
+        return new CompleteTraverser<>(coefficient.clone(), object);
+    }
 }
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/Path.java 
b/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/Path.java
index d318971..7509824 100644
--- a/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/Path.java
+++ b/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/Path.java
@@ -18,6 +18,7 @@
  */
 package org.apache.tinkerpop.machine.traversers;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
@@ -25,7 +26,7 @@ import java.util.Set;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class Path {
+public class Path implements Serializable {
 
     private final List<Object> objects = new ArrayList<>();
     private final List<Set<String>> labels = new ArrayList<>();
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/Traverser.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/Traverser.java
index 4d2e92a..27c9f92 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/Traverser.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/Traverser.java
@@ -19,54 +19,63 @@
 package org.apache.tinkerpop.machine.traversers;
 
 import org.apache.tinkerpop.machine.coefficients.Coefficient;
+import org.apache.tinkerpop.machine.functions.FilterFunction;
+import org.apache.tinkerpop.machine.functions.FlatMapFunction;
+import org.apache.tinkerpop.machine.functions.MapFunction;
+import org.apache.tinkerpop.machine.functions.reduce.Reducer;
 
+import java.io.Serializable;
 import java.util.Collections;
-import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class Traverser<C, S> {
+public interface Traverser<C, S> extends Serializable {
 
-    private final Coefficient<C> coefficient;
-    private final S object;
-    private Path path = new Path();
+    public Coefficient<C> coefficient();
 
-    public Traverser(final Coefficient<C> coefficient, final S object) {
-        this.coefficient = coefficient;
-        this.object = object;
-    }
+    public S object();
 
-    public Coefficient<C> coefficient() {
-        return this.coefficient;
-    }
+    public Path path();
 
-    public S object() {
-        return this.object;
-    }
+    public void addLabel(final String label);
 
-    public Path path() {
-        return this.path;
+    public default void addLabels(final Set<String> labels) {
+        for (final String label : labels) {
+            this.addLabel(label);
+        }
     }
 
-    public void addLabel(final String label) {
-        this.path.addLabels(Collections.singleton(label));
+    public default boolean filter(final FilterFunction<C, S> function) {
+        if (function.test(this)) {
+            this.coefficient().multiply(function.coefficient().value());
+            this.addLabels(function.labels());
+            return true;
+        } else {
+            return false;
+        }
     }
 
-    public <B> Traverser<C, B> split(final B object) {
-        final Traverser<C, B> traverser = new 
Traverser<>(this.coefficient.clone(), object);
-        traverser.path = new Path(this.path);
-        traverser.path.add(new HashSet<>(), object);
+    public default <E> Traverser<C, E> map(final MapFunction<C, S, E> 
function) {
+        final Coefficient<C> eCoefficient = 
this.coefficient().clone().multiply(function.coefficient().value());
+        final E eObject = function.apply(this);
+        final Traverser<C, E> traverser = this.split(eCoefficient, eObject);
+        traverser.addLabels(function.labels());
         return traverser;
     }
 
-    @Override
-    public boolean equals(final Object other) {
-        return other instanceof Traverser && ((Traverser<C, S>) 
other).object.equals(this.object);
+    public default <E> Iterator<Traverser<C, E>> flatMap(final 
FlatMapFunction<C, S, E> function) {
+        return Collections.emptyIterator();
     }
 
-    @Override
-    public String toString() {
-        return this.object.toString();
+    //public default void sideEffect(final SideEffectFunction<C,S> function);
+
+    public default <E> Traverser<C, E> reduce(final Reducer<E> reducer) {
+        final Traverser<C, E> traverser = 
this.split(this.coefficient().clone().unity(), reducer.get());
+        return traverser;
     }
+
+    public <E> Traverser<C, E> split(final Coefficient<C> coefficient, final E 
object);
 }
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/CFunction.java 
b/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/TraverserFactory.java
similarity index 83%
copy from 
java/core/src/main/java/org/apache/tinkerpop/machine/functions/CFunction.java
copy to 
java/core/src/main/java/org/apache/tinkerpop/machine/traversers/TraverserFactory.java
index 144b77f..22f2911 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/CFunction.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/TraverserFactory.java
@@ -16,19 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.tinkerpop.machine.functions;
+package org.apache.tinkerpop.machine.traversers;
 
 import org.apache.tinkerpop.machine.coefficients.Coefficient;
 
-import java.util.Set;
-
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public interface CFunction<C> {
-
-    public Coefficient<C> coefficient();
-
-    public Set<String> labels();
+public interface TraverserFactory<C, S> {
 
+    public Traverser<C, S> create(final Coefficient<C> coefficient, final S 
object);
 }
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/TraverserSet.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/TraverserSet.java
index a7c011b..ce52b63 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/TraverserSet.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/TraverserSet.java
@@ -140,8 +140,8 @@ public class TraverserSet<C, S> extends 
AbstractSet<Traverser<C, S>> implements
         return this.map.values().toString();
     }
 
-    /*public void sort(final Comparator<Traverser<S>> comparator) {
-        final List<Traverser<C, S>> list = new ArrayList<>(this.map.size());
+    /*public void sort(final Comparator<CompleteTraverser<S>> comparator) {
+        final List<CompleteTraverser<C, S>> list = new 
ArrayList<>(this.map.size());
         
IteratorUtils.removeOnNext(this.map.values().iterator()).forEachRemaining(list::add);
         Collections.sort(list, comparator);
         this.map.reset();
@@ -149,7 +149,7 @@ public class TraverserSet<C, S> extends 
AbstractSet<Traverser<C, S>> implements
     }
 
     public void shuffle() {
-        final List<Traverser<C, S>> list = new ArrayList<>(this.map.size());
+        final List<CompleteTraverser<C, S>> list = new 
ArrayList<>(this.map.size());
         
IteratorUtils.removeOnNext(this.map.values().iterator()).forEachRemaining(list::add);
         Collections.shuffle(list);
         this.map.reset();
diff --git a/java/machine/beam/pom.xml b/java/machine/beam/pom.xml
new file mode 100644
index 0000000..2fb4c82
--- /dev/null
+++ b/java/machine/beam/pom.xml
@@ -0,0 +1,103 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <artifactId>machine</artifactId>
+        <groupId>org.apache.tinkerpop</groupId>
+        <version>4.0.0-SNAPSHOT</version>
+    </parent>
+    <name>Apache TinkerPop (Java) :: Machine :: Beam</name>
+    <artifactId>beam</artifactId>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.tinkerpop</groupId>
+            <artifactId>core</artifactId>
+            <version>4.0.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.beam</groupId>
+            <artifactId>beam-sdks-java-core</artifactId>
+            <version>2.11.0</version>
+            <!-- APACHE BEAM DOES NOT USE ENFORCER PLUGIN -->
+            <exclusions>
+                <exclusion>
+                    <groupId>org.tukaani</groupId>
+                    <artifactId>xz</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.xerial.snappy</groupId>
+                    <artifactId>snappy-java</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-annotations</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-annotations</artifactId>
+            <version>2.9.8</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>1.7.25</version>
+        </dependency>
+        <dependency>
+            <groupId>org.xerial.snappy</groupId>
+            <artifactId>snappy-java</artifactId>
+            <version>1.1.1.7</version>
+        </dependency>
+        <dependency>
+            <groupId>org.tukaani</groupId>
+            <artifactId>xz</artifactId>
+            <version>1.8</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.beam</groupId>
+            <artifactId>beam-runners-direct-java</artifactId>
+            <version>2.11.0</version>
+            <scope>runtime</scope>
+        </dependency>
+    </dependencies>
+    <build>
+        <directory>${basedir}/target</directory>
+        <finalName>${project.artifactId}-${project.version}</finalName>
+        <testResources>
+            <testResource>
+                <directory>${basedir}/src/test/resources
+                </directory>
+            </testResource>
+        </testResources>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-failsafe-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file
diff --git 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java
new file mode 100644
index 0000000..a03ee71
--- /dev/null
+++ 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.machine.beam;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.tinkerpop.machine.bytecode.Bytecode;
+import org.apache.tinkerpop.machine.bytecode.BytecodeUtil;
+import org.apache.tinkerpop.machine.coefficients.LongCoefficient;
+import org.apache.tinkerpop.machine.functions.CFunction;
+import org.apache.tinkerpop.machine.functions.FilterFunction;
+import org.apache.tinkerpop.machine.functions.MapFunction;
+import org.apache.tinkerpop.machine.functions.initial.InjectInitial;
+import org.apache.tinkerpop.machine.processor.Processor;
+import org.apache.tinkerpop.machine.traversers.CompleteTraverser;
+import org.apache.tinkerpop.machine.traversers.Traverser;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class Beam<C, S, E> implements Processor<C, S, E> {
+
+    final Pipeline pipeline;
+    PCollection collection;
+    public static List<Traverser> OUTPUT = new ArrayList<>();
+    Iterator<Traverser> iterator = null;
+
+    public Beam(final Bytecode<C> bytecode) {
+        this.pipeline = Pipeline.create();
+        
this.pipeline.getCoderRegistry().registerCoderForClass(Traverser.class, new 
TraverserCoder<>());
+
+        for (final CFunction<?> function : BytecodeUtil.compile(bytecode)) {
+            if (function instanceof InjectInitial) {
+                final List<Traverser<C, S>> objects = new ArrayList<>();
+                final Iterator<S> iterator = ((InjectInitial) function).get();
+                while (iterator.hasNext())
+                    objects.add(new 
CompleteTraverser(LongCoefficient.create(), iterator.next()));
+                this.collection = 
this.pipeline.apply(Create.of(objects).withCoder(new TraverserCoder<>()));
+            } else if (function instanceof FilterFunction) {
+                collection = (PCollection) collection.apply(ParDo.of(new 
FilterFn<>((FilterFunction<C, S>) function)));
+                collection.setCoder(new TraverserCoder());
+            } else if (function instanceof MapFunction) {
+                collection = (PCollection) collection.apply(ParDo.of(new 
MapFn<>((MapFunction<C, S, E>) function)));
+                collection.setCoder(new TraverserCoder());
+            } else
+                throw new RuntimeException("You need a new step type:" + 
function);
+        }
+        collection = (PCollection) collection.apply(ParDo.of(new 
OutputStep()));
+
+    }
+
+    @Override
+    public void addStart(Traverser<C, S> traverser) {
+
+    }
+
+    @Override
+    public Traverser<C, E> next() {
+        if (null == this.iterator) {
+            pipeline.run().waitUntilFinish();
+            this.iterator = OUTPUT.iterator();
+        }
+        return this.iterator.next();
+    }
+
+    @Override
+    public boolean hasNext() {
+        if (null == this.iterator) {
+            pipeline.run().waitUntilFinish();
+            this.iterator = OUTPUT.iterator();
+        }
+        return this.iterator.hasNext();
+    }
+
+    @Override
+    public void reset() {
+
+    }
+
+    @Override
+    public String toString() {
+        return this.pipeline.toString();
+    }
+}
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/MapFunction.java
 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/BeamProcessor.java
similarity index 68%
copy from 
java/core/src/main/java/org/apache/tinkerpop/machine/functions/MapFunction.java
copy to 
java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/BeamProcessor.java
index f5b5302..7da3f47 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/MapFunction.java
+++ 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/BeamProcessor.java
@@ -16,15 +16,19 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.tinkerpop.machine.functions;
+package org.apache.tinkerpop.machine.beam;
 
-import org.apache.tinkerpop.machine.traversers.Traverser;
-
-import java.util.function.Function;
+import org.apache.tinkerpop.machine.bytecode.Bytecode;
+import org.apache.tinkerpop.machine.processor.Processor;
+import org.apache.tinkerpop.machine.processor.ProcessorFactory;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public interface MapFunction<C, S, E> extends Function<Traverser<C, S>, 
Traverser<C, E>>, CFunction<C> {
+public class BeamProcessor implements ProcessorFactory {
 
+    @Override
+    public <C, S, E> Processor<C, S, E> mint(final Bytecode<C> bytecode) {
+        return new Beam<>(bytecode);
+    }
 }
diff --git 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/FilterStep.java
 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/FilterFn.java
similarity index 64%
copy from 
java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/FilterStep.java
copy to 
java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/FilterFn.java
index eb1d1e2..a3d1f9d 100644
--- 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/FilterStep.java
+++ 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/FilterFn.java
@@ -16,27 +16,26 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.tinkerpop.machine.pipes;
+package org.apache.tinkerpop.machine.beam;
 
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.tinkerpop.machine.functions.FilterFunction;
 import org.apache.tinkerpop.machine.traversers.Traverser;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class FilterStep<C, S> extends AbstractStep<C, S, S> {
+public class FilterFn<C, S> extends DoFn<Traverser<C, S>, Traverser<C, S>> {
 
-    public FilterStep(final AbstractStep<C, ?, S> previousStep, final 
FilterFunction<C, S> filterFunction) {
-        super(previousStep, filterFunction);
+    private FilterFunction<C, S> filterFunction;
+
+    public FilterFn(final FilterFunction<C, S> filterFunction) {
+        this.filterFunction = filterFunction;
     }
 
-    @Override
-    public Traverser<C, S> next() {
-        Traverser<C, S> traverser;
-        while (true) {
-            traverser = this.processNextTraverser();
-            if (((FilterFunction<C, S>) this.function).test(traverser))
-                return traverser;
-        }
+    @ProcessElement
+    public void processElement(final @Element Traverser<C, S> traverser, final 
OutputReceiver<Traverser<C, S>> output) {
+        if (traverser.filter(this.filterFunction))
+            output.output(traverser);
     }
-}
+}
\ No newline at end of file
diff --git 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/MapStep.java
 b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/MapFn.java
similarity index 64%
copy from 
java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/MapStep.java
copy to 
java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/MapFn.java
index 1216491..2b83ab2 100644
--- 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/MapStep.java
+++ 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/MapFn.java
@@ -16,22 +16,26 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.tinkerpop.machine.pipes;
+package org.apache.tinkerpop.machine.beam;
 
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.tinkerpop.machine.functions.MapFunction;
 import org.apache.tinkerpop.machine.traversers.Traverser;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class MapStep<C, S, E> extends AbstractStep<C, S, E> {
+public class MapFn<C, S, E> extends DoFn<Traverser<C, S>, Traverser<C, E>> {
 
-    public MapStep(final AbstractStep<C, ?, S> previousStep, final 
MapFunction<C, S, E> mapFunction) {
-        super(previousStep, mapFunction);
+    private final MapFunction<C, S, E> mapFunction;
+
+    public MapFn(final MapFunction<C, S, E> mapFunction) {
+        this.mapFunction = mapFunction;
     }
 
-    @Override
-    public Traverser<C, E> next() {
-        return ((MapFunction<C, S, E>) 
this.function).apply(super.processNextTraverser());
+    @ProcessElement
+    public void processElement(final @Element Traverser<C, S> traverser, final 
OutputReceiver<Traverser<C, E>> output) {
+        output.output(traverser.map(this.mapFunction));
     }
 }
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/MapFunction.java
 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/OutputStep.java
similarity index 71%
copy from 
java/core/src/main/java/org/apache/tinkerpop/machine/functions/MapFunction.java
copy to 
java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/OutputStep.java
index f5b5302..8dddaea 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/MapFunction.java
+++ 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/OutputStep.java
@@ -16,15 +16,23 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.tinkerpop.machine.functions;
+package org.apache.tinkerpop.machine.beam;
 
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.tinkerpop.machine.traversers.Traverser;
 
-import java.util.function.Function;
-
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public interface MapFunction<C, S, E> extends Function<Traverser<C, S>, 
Traverser<C, E>>, CFunction<C> {
+public class OutputStep<C, S> extends DoFn<Traverser<C, S>, String> {
+
+
+    public OutputStep() {
+
+    }
 
-}
+    @ProcessElement
+    public void processElement(final @Element Traverser<C, S> traverser, final 
OutputReceiver<String> output) {
+        Beam.OUTPUT.add(traverser);
+    }
+}
\ No newline at end of file
diff --git 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/TraverserCoder.java
 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/TraverserCoder.java
new file mode 100644
index 0000000..f35cdd1
--- /dev/null
+++ 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/TraverserCoder.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.machine.beam;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.tinkerpop.machine.coefficients.LongCoefficient;
+import org.apache.tinkerpop.machine.traversers.CompleteTraverser;
+import org.apache.tinkerpop.machine.traversers.Traverser;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class TraverserCoder<C, S> extends Coder<Traverser<C, S>> {
+
+    @Override
+    public void encode(final Traverser<C, S> value, final OutputStream 
outStream) throws CoderException, IOException {
+        ObjectOutputStream outputStream = new ObjectOutputStream(outStream);
+        outputStream.writeObject(value.object());
+    }
+
+    @Override
+    public Traverser<C, S> decode(InputStream inStream) throws CoderException, 
IOException {
+        try {
+            ObjectInputStream inputStream = new ObjectInputStream(inStream);
+            return new CompleteTraverser(LongCoefficient.create(), 
inputStream.readObject());
+        } catch (final ClassNotFoundException e) {
+            throw new IOException(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public List<? extends Coder<?>> getCoderArguments() {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public void verifyDeterministic() throws NonDeterministicException {
+
+    }
+}
diff --git 
a/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java
 
b/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java
similarity index 74%
copy from 
java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java
copy to 
java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java
index 38d0df1..1a84253 100644
--- 
a/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java
+++ 
b/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java
@@ -16,27 +16,30 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.tinkerpop.machine.pipes;
+package org.apache.tinkerpop.machine.beam;
 
 import org.apache.tinkerpop.language.Gremlin;
 import org.apache.tinkerpop.language.Traversal;
 import org.apache.tinkerpop.language.TraversalSource;
 import org.apache.tinkerpop.language.TraversalUtil;
-import org.apache.tinkerpop.language.__;
 import org.apache.tinkerpop.machine.coefficients.LongCoefficient;
 import org.junit.jupiter.api.Test;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class PipesTest {
-
+public class BeamTest {
     @Test
     public void shouldWork() {
         final TraversalSource<Long> g = Gremlin.<Long>traversal()
                 .coefficient(LongCoefficient.create())
-                .processor(PipesProcessor.class);
-        final Traversal<Long, Long, Long> traversal = g.inject(7L, 10L, 
12L).as("a").c(3L).map(__.incr()).identity().incr();
+                .processor(BeamProcessor.class);
+        Traversal<Long, Long, Long> traversal = g.inject(7L, 10L, 
12L).as("a").c(3L).identity().incr();
+        System.out.println(TraversalUtil.getBytecode(traversal));
+        System.out.println(traversal);
+        System.out.println(traversal.toList());
+        System.out.println("\n----------\n");
+        traversal = g.inject(7L, 10L, 
12L).as("a").c(3L).identity().incr().is(44L);//.count();
         System.out.println(TraversalUtil.getBytecode(traversal));
         System.out.println(traversal);
         System.out.println(traversal.toList());
diff --git 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/AbstractStep.java
 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/AbstractStep.java
index 3136c38..ac6f73e 100644
--- 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/AbstractStep.java
+++ 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/AbstractStep.java
@@ -48,7 +48,7 @@ public abstract class AbstractStep<C, S, E> implements 
Step<C, S, E> {
     @Override
     public abstract Traverser<C, E> next();
 
-    protected Traverser<C, S> processNextTraverser() {
+    protected Traverser<C, S> getPreviousTraverser() {
         if (!this.traverserSet.isEmpty())
             return this.traverserSet.remove();
         else
diff --git 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/EmptyStep.java
 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/EmptyStep.java
index c1dff07..a93ea13 100644
--- 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/EmptyStep.java
+++ 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/EmptyStep.java
@@ -18,7 +18,7 @@
  */
 package org.apache.tinkerpop.machine.pipes;
 
-import org.apache.tinkerpop.machine.traversers.Traverser;
+import org.apache.tinkerpop.machine.traversers.CompleteTraverser;
 import org.apache.tinkerpop.util.FastNoSuchElementException;
 
 /**
@@ -38,7 +38,7 @@ public final class EmptyStep<C, S, E> extends AbstractStep<C, 
S, E> {
     }
 
     @Override
-    public Traverser<C, E> next() {
+    public CompleteTraverser<C, E> next() {
         throw FastNoSuchElementException.instance();
     }
 
diff --git 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/FilterStep.java
 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/FilterStep.java
index eb1d1e2..aeee847 100644
--- 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/FilterStep.java
+++ 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/FilterStep.java
@@ -26,17 +26,44 @@ import org.apache.tinkerpop.machine.traversers.Traverser;
  */
 public class FilterStep<C, S> extends AbstractStep<C, S, S> {
 
+    private final FilterFunction<C, S> filterFunction;
+    private Traverser<C, S> nextTraverser = null;
+
     public FilterStep(final AbstractStep<C, ?, S> previousStep, final 
FilterFunction<C, S> filterFunction) {
         super(previousStep, filterFunction);
+        this.filterFunction = filterFunction;
     }
 
     @Override
     public Traverser<C, S> next() {
-        Traverser<C, S> traverser;
-        while (true) {
-            traverser = this.processNextTraverser();
-            if (((FilterFunction<C, S>) this.function).test(traverser))
-                return traverser;
+        if (null != this.nextTraverser) {
+            final Traverser<C, S> traverser = this.nextTraverser;
+            this.nextTraverser = null;
+            return traverser;
+        } else {
+            Traverser<C, S> traverser;
+            while (true) {
+                traverser = this.getPreviousTraverser();
+                if (traverser.filter(this.filterFunction))
+                    return traverser;
+            }
+        }
+    }
+
+    @Override
+    public boolean hasNext() {
+        if (null != this.nextTraverser)
+            return true;
+        else {
+            Traverser<C, S> traverser;
+            while (super.hasNext()) {
+                traverser = this.getPreviousTraverser();
+                if (traverser.filter(this.filterFunction)) {
+                    this.nextTraverser = traverser;
+                    return true;
+                }
+            }
+            return false;
         }
     }
 }
diff --git 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/InitialStep.java
 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/InitialStep.java
index 31d3c3e..ef4e075 100644
--- 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/InitialStep.java
+++ 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/InitialStep.java
@@ -20,6 +20,7 @@ package org.apache.tinkerpop.machine.pipes;
 
 import org.apache.tinkerpop.machine.functions.InitialFunction;
 import org.apache.tinkerpop.machine.traversers.Traverser;
+import org.apache.tinkerpop.machine.traversers.TraverserFactory;
 
 import java.util.Collections;
 import java.util.Iterator;
@@ -29,11 +30,13 @@ import java.util.Iterator;
  */
 public class InitialStep<C, S> extends AbstractStep<C, S, S> {
 
-    private Iterator<Traverser<C, S>> objects;
+    private Iterator<S> objects;
+    private final TraverserFactory<C, S> traverserFactory;
 
-    public InitialStep(final InitialFunction<C, S> initialFunction) {
+    public InitialStep(final InitialFunction<C, S> initialFunction, final 
TraverserFactory<C, S> traverserFactory) {
         super(EmptyStep.instance(), initialFunction);
         this.objects = initialFunction.get();
+        this.traverserFactory = traverserFactory;
     }
 
     @Override
@@ -43,7 +46,7 @@ public class InitialStep<C, S> extends AbstractStep<C, S, S> {
 
     @Override
     public Traverser<C, S> next() {
-        return this.objects.next();
+        return this.traverserFactory.create(this.function.coefficient(), 
this.objects.next());
     }
 
     @Override
diff --git 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/MapStep.java
 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/MapStep.java
index 1216491..fa8b49f 100644
--- 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/MapStep.java
+++ 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/MapStep.java
@@ -26,12 +26,15 @@ import org.apache.tinkerpop.machine.traversers.Traverser;
  */
 public class MapStep<C, S, E> extends AbstractStep<C, S, E> {
 
+    private final MapFunction<C, S, E> mapFunction;
+
     public MapStep(final AbstractStep<C, ?, S> previousStep, final 
MapFunction<C, S, E> mapFunction) {
         super(previousStep, mapFunction);
+        this.mapFunction = mapFunction;
     }
 
     @Override
     public Traverser<C, E> next() {
-        return ((MapFunction<C, S, E>) 
this.function).apply(super.processNextTraverser());
+        return super.getPreviousTraverser().map(this.mapFunction);
     }
 }
diff --git 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java
 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java
index bc5028d..b38ecd4 100644
--- 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java
+++ 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java
@@ -25,8 +25,11 @@ import org.apache.tinkerpop.machine.functions.FilterFunction;
 import org.apache.tinkerpop.machine.functions.InitialFunction;
 import org.apache.tinkerpop.machine.functions.MapFunction;
 import org.apache.tinkerpop.machine.functions.NestedFunction;
+import org.apache.tinkerpop.machine.functions.ReduceFunction;
+import org.apache.tinkerpop.machine.functions.reduce.BasicReducer;
 import org.apache.tinkerpop.machine.processor.Processor;
 import org.apache.tinkerpop.machine.traversers.Traverser;
+import org.apache.tinkerpop.machine.traversers.TraverserFactory;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -40,18 +43,21 @@ public class Pipes<C, S, E> implements Processor<C, S, E> {
     private Step<C, ?, E> endStep;
     private Step<C, S, ?> startStep = EmptyStep.instance();
 
-    private Pipes(final List<CFunction<C>> functions) {
+    private Pipes(final List<CFunction<C>> functions, final 
TraverserFactory<C, S> traverserFactory) {
         AbstractStep<C, ?, ?> previousStep = EmptyStep.instance();
         for (final CFunction<?> function : functions) {
             if (function instanceof NestedFunction)
-                ((NestedFunction<C, ?, ?>) function).setProcessor(new 
Pipes<>(((NestedFunction<C, ?, ?>) function).getFunctions()));
+                ((NestedFunction<C, ?, ?>) function).setProcessor(new 
Pipes(((NestedFunction<C, S, E>) function).getFunctions(), traverserFactory));
             final AbstractStep nextStep;
             if (function instanceof FilterFunction)
                 nextStep = new FilterStep(previousStep, (FilterFunction<C, ?>) 
function);
             else if (function instanceof MapFunction)
                 nextStep = new MapStep(previousStep, (MapFunction<C, ?, ?>) 
function);
             else if (function instanceof InitialFunction)
-                nextStep = new InitialStep<>((InitialFunction<C, S>) function);
+                // TODO: traverser factory
+                nextStep = new InitialStep((InitialFunction<C, S>) function, 
traverserFactory);
+            else if (function instanceof ReduceFunction)
+                nextStep = new ReduceStep(previousStep, (ReduceFunction<C, ?, 
?>) function, new BasicReducer<>(((ReduceFunction<C, ?, ?>) 
function).getInitialValue()), traverserFactory);
             else
                 throw new RuntimeException("You need a new step type:" + 
function);
 
@@ -65,7 +71,7 @@ public class Pipes<C, S, E> implements Processor<C, S, E> {
     }
 
     public Pipes(final Bytecode<C> bytecode) {
-        this(BytecodeUtil.compile(BytecodeUtil.optimize(bytecode)));
+        this(BytecodeUtil.compile(BytecodeUtil.optimize(bytecode)), 
bytecode.getTraverserFactory());
     }
 
     @Override
diff --git 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/ReduceStep.java
 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/ReduceStep.java
new file mode 100644
index 0000000..574f2b9
--- /dev/null
+++ 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/ReduceStep.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.machine.pipes;
+
+import org.apache.tinkerpop.machine.functions.ReduceFunction;
+import org.apache.tinkerpop.machine.functions.reduce.Reducer;
+import org.apache.tinkerpop.machine.traversers.Traverser;
+import org.apache.tinkerpop.machine.traversers.TraverserFactory;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class ReduceStep<C, S, E> extends AbstractStep<C, S, E> {
+
+    private final ReduceFunction<C, S, E> reduceFunction;
+    private final Reducer<E> reducer;
+    private final TraverserFactory<C, E> traverserFactory;
+    private boolean done = false;
+
+    public ReduceStep(final AbstractStep<C, ?, S> previousStep,
+                      final ReduceFunction<C, S, E> reduceFunction,
+                      final Reducer<E> reducer,
+                      final TraverserFactory<C, E> traverserFactory) {
+        super(previousStep, reduceFunction);
+        this.reduceFunction = reduceFunction;
+        this.reducer = reducer;
+        this.traverserFactory = traverserFactory;
+    }
+
+    @Override
+    public Traverser<C, E> next() {
+        this.done = true;
+        Traverser<C, S> traverser = null;
+        while (this.hasNext()) {
+            traverser = getPreviousTraverser();
+            this.reducer.update(this.reduceFunction.apply(traverser, 
this.reducer.get()));
+        }
+        return null == traverser ?
+                this.traverserFactory.create(this.function.coefficient(), 
this.reduceFunction.getInitialValue()) :
+                traverser.reduce(this.reducer);
+    }
+
+    @Override
+    public boolean hasNext() {
+        return !this.done;
+    }
+}
diff --git 
a/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java
 
b/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java
index 38d0df1..8a7fd48 100644
--- 
a/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java
+++ 
b/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java
@@ -36,7 +36,12 @@ public class PipesTest {
         final TraversalSource<Long> g = Gremlin.<Long>traversal()
                 .coefficient(LongCoefficient.create())
                 .processor(PipesProcessor.class);
-        final Traversal<Long, Long, Long> traversal = g.inject(7L, 10L, 
12L).as("a").c(3L).map(__.incr()).identity().incr();
+        Traversal<Long, Long, Long> traversal = g.inject(7L, 10L, 
12L).as("a").c(3L).map(__.incr()).identity().incr();
+        System.out.println(TraversalUtil.getBytecode(traversal));
+        System.out.println(traversal);
+        System.out.println(traversal.toList());
+        System.out.println("\n----------\n");
+        traversal = g.inject(7L, 10L, 
12L).as("a").c(3L).map(__.incr()).identity().incr().is(44L).count();
         System.out.println(TraversalUtil.getBytecode(traversal));
         System.out.println(traversal);
         System.out.println(traversal.toList());
diff --git a/java/machine/pom.xml b/java/machine/pom.xml
index 8aef974..8fa7e4f 100644
--- a/java/machine/pom.xml
+++ b/java/machine/pom.xml
@@ -26,5 +26,6 @@ limitations under the License.
     <artifactId>machine</artifactId>
     <modules>
         <module>pipes</module>
+        <module>beam</module>
     </modules>
 </project>
\ No newline at end of file
diff --git a/java/pom.xml b/java/pom.xml
index 6ee5dd1..3887958 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -170,7 +170,8 @@ limitations under the License.
                     <artifactId>maven-surefire-plugin</artifactId>
                     <version>2.22.1</version>
                     <configuration>
-                        
<argLine>-Dlog4j.configuration=${log4j-test.properties} 
-Dbuild.dir=${project.build.directory} -Dis.testing=true
+                        
<argLine>-Dlog4j.configuration=${log4j-test.properties} 
-Dbuild.dir=${project.build.directory}
+                            -Dis.testing=true
                         </argLine>
                         <excludes>
                             <exclude>**/*IntegrateTest.java</exclude>

Reply via email to