This is an automated email from the ASF dual-hosted git repository.

okram pushed a commit to branch tp4
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit 4ee2c36b57d1f1fdb888d6c32ffdf5c3fedb4ac1
Author: Marko A. Rodriguez <okramma...@gmail.com>
AuthorDate: Mon Mar 11 09:15:37 2019 -0600

    have ReducerFn working in Apache Beam. Now we have count(). This means that 
both Pipes and Beam machines are at the same level of completition.
---
 .../tinkerpop/machine/bytecode/Bytecode.java       |  6 +-
 .../machine/functions/ReduceFunction.java          |  2 +
 .../machine/functions/reduce/CountReduce.java      |  5 ++
 .../machine/functions/reduce/Reducer.java          |  4 +-
 .../traversers/CompleteTraverserFactory.java       |  5 +-
 .../machine/traversers/TraverserFactory.java       |  6 +-
 .../tinkerpop/machine/beam/BasicAccumulator.java}  | 51 ++++++++--------
 .../org/apache/tinkerpop/machine/beam/Beam.java    | 20 +++++--
 .../java/org/apache/tinkerpop/machine/beam/Fn.java |  2 +
 .../apache/tinkerpop/machine/beam/ReduceFn.java    | 68 +++++++++++++++++++---
 .../tinkerpop/machine/beam/ReducerCoder.java       | 62 ++++++++++++++++++++
 .../apache/tinkerpop/machine/beam/BeamTest.java    |  2 +-
 .../tinkerpop/machine/pipes/InitialStep.java       |  4 +-
 .../org/apache/tinkerpop/machine/pipes/Pipes.java  |  2 +-
 .../apache/tinkerpop/machine/pipes/ReduceStep.java |  4 +-
 15 files changed, 191 insertions(+), 52 deletions(-)

diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/Bytecode.java 
b/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/Bytecode.java
index 3310070..74d4d3e 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/Bytecode.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/Bytecode.java
@@ -31,8 +31,8 @@ import java.util.List;
  */
 public class Bytecode<C> implements Cloneable {
 
-    public List<Strategy> strategies = new ArrayList<>();
-    public List<Instruction<C>> instructions = new ArrayList<>();
+    private List<Strategy> strategies = new ArrayList<>();
+    private List<Instruction<C>> instructions = new ArrayList<>();
 
 
     public void addStrategy(final Strategy strategy) {
@@ -63,7 +63,7 @@ public class Bytecode<C> implements Cloneable {
     }
 
     // this should be part of processor!
-    public <S> TraverserFactory<C, S> getTraverserFactory() {
+    public <S> TraverserFactory<C> getTraverserFactory() {
         return new CompleteTraverserFactory<>();
     }
 
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/ReduceFunction.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/ReduceFunction.java
index c2ceb0d..eac368d 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/ReduceFunction.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/ReduceFunction.java
@@ -28,5 +28,7 @@ import java.util.function.BiFunction;
 public interface ReduceFunction<C, S, E> extends BiFunction<Traverser<C, S>, 
E, E>, CFunction<C> {
 
     public E getInitialValue();
+
+    public E merge(final E valueA, final E valueB);
 }
 
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/CountReduce.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/CountReduce.java
index 8c674c3..65dde8e 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/CountReduce.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/CountReduce.java
@@ -40,6 +40,11 @@ public class CountReduce<C, S> extends AbstractFunction<C, 
S, Long> implements R
     }
 
     @Override
+    public Long merge(final Long valueA, final Long valueB) {
+        return valueA + valueB;
+    }
+
+    @Override
     public Long getInitialValue() {
         return 0L;
     }
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/Reducer.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/Reducer.java
index 8434ca7..8103459 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/Reducer.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/Reducer.java
@@ -18,10 +18,12 @@
  */
 package org.apache.tinkerpop.machine.functions.reduce;
 
+import java.io.Serializable;
+
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public interface Reducer<S> {
+public interface Reducer<S> extends Serializable {
 
     public S get();
 
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/CompleteTraverserFactory.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/CompleteTraverserFactory.java
index 7d9356f..59ed747 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/CompleteTraverserFactory.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/CompleteTraverserFactory.java
@@ -23,9 +23,10 @@ import org.apache.tinkerpop.machine.coefficients.Coefficient;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class CompleteTraverserFactory<C, S> implements TraverserFactory<C, S> {
+public class CompleteTraverserFactory<C> implements TraverserFactory<C> {
+
     @Override
-    public Traverser<C, S> create(final Coefficient<C> coefficient, final S 
object) {
+    public <S> Traverser<C, S> create(final Coefficient<C> coefficient, final 
S object) {
         return new CompleteTraverser<>(coefficient.clone(), object);
     }
 }
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/TraverserFactory.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/TraverserFactory.java
index 22f2911..0160e30 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/TraverserFactory.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/TraverserFactory.java
@@ -20,10 +20,12 @@ package org.apache.tinkerpop.machine.traversers;
 
 import org.apache.tinkerpop.machine.coefficients.Coefficient;
 
+import java.io.Serializable;
+
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public interface TraverserFactory<C, S> {
+public interface TraverserFactory<C> extends Serializable {
 
-    public Traverser<C, S> create(final Coefficient<C> coefficient, final S 
object);
+    public <S> Traverser<C, S> create(final Coefficient<C> coefficient, final 
S object);
 }
diff --git 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/ReduceStep.java
 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/BasicAccumulator.java
similarity index 51%
copy from 
java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/ReduceStep.java
copy to 
java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/BasicAccumulator.java
index 574f2b9..0c05d7b 100644
--- 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/ReduceStep.java
+++ 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/BasicAccumulator.java
@@ -16,48 +16,49 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.tinkerpop.machine.pipes;
+package org.apache.tinkerpop.machine.beam;
 
+import org.apache.beam.sdk.coders.DefaultCoder;
+import org.apache.beam.sdk.transforms.Combine;
 import org.apache.tinkerpop.machine.functions.ReduceFunction;
-import org.apache.tinkerpop.machine.functions.reduce.Reducer;
 import org.apache.tinkerpop.machine.traversers.Traverser;
 import org.apache.tinkerpop.machine.traversers.TraverserFactory;
 
+import java.io.Serializable;
+
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class ReduceStep<C, S, E> extends AbstractStep<C, S, E> {
+@DefaultCoder(ReducerCoder.class)
+public class BasicAccumulator<C, S, E> implements 
Combine.AccumulatingCombineFn.Accumulator<Traverser<C, S>, BasicAccumulator<C, 
S, E>, Traverser<C, E>>, Serializable {
 
+    private E value;
     private final ReduceFunction<C, S, E> reduceFunction;
-    private final Reducer<E> reducer;
-    private final TraverserFactory<C, E> traverserFactory;
-    private boolean done = false;
-
-    public ReduceStep(final AbstractStep<C, ?, S> previousStep,
-                      final ReduceFunction<C, S, E> reduceFunction,
-                      final Reducer<E> reducer,
-                      final TraverserFactory<C, E> traverserFactory) {
-        super(previousStep, reduceFunction);
+    private final TraverserFactory<C> traverserFactory;
+
+    public BasicAccumulator(final ReduceFunction<C, S, E> reduceFunction, 
final TraverserFactory<C> traverserFactory) {
+        super();
+        this.value = reduceFunction.getInitialValue();
         this.reduceFunction = reduceFunction;
-        this.reducer = reducer;
         this.traverserFactory = traverserFactory;
     }
 
+    public void setValue(final E value) {
+        this.value = value;
+    }
+
+    @Override
+    public void addInput(final Traverser<C, S> input) {
+        this.value = reduceFunction.apply(input, this.value);
+    }
+
     @Override
-    public Traverser<C, E> next() {
-        this.done = true;
-        Traverser<C, S> traverser = null;
-        while (this.hasNext()) {
-            traverser = getPreviousTraverser();
-            this.reducer.update(this.reduceFunction.apply(traverser, 
this.reducer.get()));
-        }
-        return null == traverser ?
-                this.traverserFactory.create(this.function.coefficient(), 
this.reduceFunction.getInitialValue()) :
-                traverser.reduce(this.reducer);
+    public void mergeAccumulator(final BasicAccumulator<C, S, E> other) {
+        this.value = 
this.reduceFunction.apply(this.traverserFactory.create(this.reduceFunction.coefficient(),
 (S) this.value), other.value);
     }
 
     @Override
-    public boolean hasNext() {
-        return !this.done;
+    public Traverser<C, E> extractOutput() {
+        return this.traverserFactory.create(this.reduceFunction.coefficient(), 
this.value);
     }
 }
diff --git 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java
index f2813b4..0040bf7 100644
--- 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java
+++ 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java
@@ -20,6 +20,7 @@ package org.apache.tinkerpop.machine.beam;
 
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -34,6 +35,7 @@ import org.apache.tinkerpop.machine.functions.MapFunction;
 import org.apache.tinkerpop.machine.functions.ReduceFunction;
 import org.apache.tinkerpop.machine.processor.Processor;
 import org.apache.tinkerpop.machine.traversers.CompleteTraverser;
+import org.apache.tinkerpop.machine.traversers.CompleteTraverserFactory;
 import org.apache.tinkerpop.machine.traversers.Traverser;
 
 import java.util.ArrayList;
@@ -47,7 +49,7 @@ public class Beam<C, S, E> implements Processor<C, S, E> {
 
     private final Pipeline pipeline;
     public static List<Traverser> OUTPUT = new ArrayList<>(); // FIX THIS!
-    private final List<DoFn> functions = new ArrayList<>();
+    private final List<Fn> functions = new ArrayList<>();
     Iterator<Traverser> iterator = null;
 
     public Beam(final List<CFunction<C>> functions) {
@@ -65,14 +67,22 @@ public class Beam<C, S, E> implements Processor<C, S, E> {
             } else if (function instanceof MapFunction) {
                 fn = new MapFn<>((MapFunction) function);
             } else if (function instanceof ReduceFunction) {
-                //fn = new ReduceFn<>((ReduceFunction)function)
+                final ReduceFn combine = new ReduceFn<>((ReduceFunction) 
function, new CompleteTraverserFactory<>());
+                collection = (PCollection) 
collection.apply(Combine.globally(combine));
+                this.functions.add(combine);
             } else
                 throw new RuntimeException("You need a new step type:" + 
function);
-            this.functions.add(fn);
-            collection = (PCollection) collection.apply(ParDo.of(fn));
+
+            if (!(function instanceof ReduceFunction)) {
+                this.functions.add((Fn) fn);
+                collection = (PCollection) collection.apply(ParDo.of(fn));
+
+            }
             collection.setCoder(new TraverserCoder());
+
+
         }
-        collection = (PCollection) collection.apply(ParDo.of(new 
OutputStep()));
+        collection.apply(ParDo.of(new OutputStep()));
         this.pipeline.getOptions().setRunner(new 
PipelineOptions.DirectRunner().create(this.pipeline.getOptions()));
     }
 
diff --git 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Fn.java 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Fn.java
index 1967b96..ca0a357 100644
--- a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Fn.java
+++ b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Fn.java
@@ -20,6 +20,8 @@ package org.apache.tinkerpop.machine.beam;
 
 import org.apache.tinkerpop.machine.traversers.Traverser;
 
+import java.io.Serializable;
+
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
diff --git 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/ReduceFn.java
 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/ReduceFn.java
index 37a46ee..2e9353d 100644
--- 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/ReduceFn.java
+++ 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/ReduceFn.java
@@ -18,25 +18,77 @@
  */
 package org.apache.tinkerpop.machine.beam;
 
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.transforms.Combine;
 import org.apache.tinkerpop.machine.functions.ReduceFunction;
-import org.apache.tinkerpop.machine.functions.reduce.Reducer;
+import org.apache.tinkerpop.machine.traversers.Traverser;
 import org.apache.tinkerpop.machine.traversers.TraverserFactory;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class ReduceFn<C, S, E> extends AbstractFn<C, S, E> {
+public class ReduceFn<C, S, E> extends Combine.CombineFn<Traverser<C, S>, 
BasicAccumulator<C, S, E>, Traverser<C, E>> implements Fn<C, S, E> {
 
     private final ReduceFunction<C, S, E> reduceFunction;
-    private final Reducer<E> reducer;
-    private final TraverserFactory<C, E> traverserFactory;
+    private final TraverserFactory<C> traverserFactory;
+
 
     public ReduceFn(final ReduceFunction<C, S, E> reduceFunction,
-                    final Reducer<E> reducer,
-                    final TraverserFactory<C, E> traverserFactory) {
-        super(reduceFunction);
+                    final TraverserFactory<C> traverserFactory) {
+        //super(reduceFunction);
         this.reduceFunction = reduceFunction;
-        this.reducer = reducer;
         this.traverserFactory = traverserFactory;
     }
+
+
+    @Override
+    public void addStart(Traverser<C, S> traverser) {
+
+    }
+
+
+    @Override
+    public BasicAccumulator<C, S, E> createAccumulator() {
+        return new BasicAccumulator<>(this.reduceFunction, 
this.traverserFactory);
+    }
+
+    @Override
+    public BasicAccumulator<C, S, E> addInput(BasicAccumulator<C, S, E> 
accumulator, Traverser<C, S> input) {
+        accumulator.addInput(input);
+        return accumulator;
+    }
+
+    @Override
+    public BasicAccumulator<C, S, E> 
mergeAccumulators(Iterable<BasicAccumulator<C, S, E>> accumulators) {
+        E value = this.reduceFunction.getInitialValue();
+        for (final BasicAccumulator accumulator : accumulators) {
+            value = this.reduceFunction.merge(value, (E) 
accumulator.extractOutput().object());
+        }
+
+        final BasicAccumulator<C, S, E> accumulator = new 
BasicAccumulator<>(this.reduceFunction, this.traverserFactory);
+        accumulator.setValue(value);
+        return accumulator;
+    }
+
+    @Override
+    public Traverser<C, E> extractOutput(BasicAccumulator<C, S, E> 
accumulator) {
+        return accumulator.extractOutput();
+    }
+
+    @Override
+    public Coder<BasicAccumulator<C, S, E>> getAccumulatorCoder(CoderRegistry 
registry, Coder<Traverser<C, S>> inputCoder) throws CannotProvideCoderException 
{
+        return new ReducerCoder<>();
+    }
+
+    @Override
+    public Coder<Traverser<C, E>> getDefaultOutputCoder(CoderRegistry 
registry, Coder<Traverser<C, S>> inputCoder) throws CannotProvideCoderException 
{
+        return new TraverserCoder<>();
+    }
+
+    @Override
+    public String toString() {
+        return this.reduceFunction.toString();
+    }
 }
diff --git 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/ReducerCoder.java
 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/ReducerCoder.java
new file mode 100644
index 0000000..dc154bc
--- /dev/null
+++ 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/ReducerCoder.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.machine.beam;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class ReducerCoder<C, S, E> extends Coder<BasicAccumulator<C, S, E>> {
+
+    @Override
+    public void encode(final BasicAccumulator<C, S, E> value, final 
OutputStream outStream) throws CoderException, IOException {
+        ObjectOutputStream outputStream = new ObjectOutputStream(outStream);
+        outputStream.writeObject(value);
+    }
+
+    @Override
+    public BasicAccumulator<C, S, E> decode(InputStream inStream) throws 
CoderException, IOException {
+        try {
+            ObjectInputStream inputStream = new ObjectInputStream(inStream);
+            return (BasicAccumulator<C, S, E>) inputStream.readObject();
+        } catch (final ClassNotFoundException e) {
+            throw new IOException(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public List<? extends Coder<?>> getCoderArguments() {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public void verifyDeterministic() throws NonDeterministicException {
+
+    }
+}
diff --git 
a/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java
 
b/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java
index 0c30723..39d3786 100644
--- 
a/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java
+++ 
b/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java
@@ -40,7 +40,7 @@ public class BeamTest {
         System.out.println(traversal);
         System.out.println(traversal.toList());
         System.out.println("\n----------\n");
-        traversal = g.inject(7L, 10L, 
12L).as("a").c(3L).map(__.incr()).identity().incr().is(9L);//.count();
+        traversal = g.inject(7L, 10L, 
12L).as("a").c(3L).map(__.incr()).identity().incr().count();
         System.out.println(TraversalUtil.getBytecode(traversal));
         System.out.println(traversal);
         System.out.println(traversal.toList());
diff --git 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/InitialStep.java
 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/InitialStep.java
index ef4e075..0138ed3 100644
--- 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/InitialStep.java
+++ 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/InitialStep.java
@@ -31,9 +31,9 @@ import java.util.Iterator;
 public class InitialStep<C, S> extends AbstractStep<C, S, S> {
 
     private Iterator<S> objects;
-    private final TraverserFactory<C, S> traverserFactory;
+    private final TraverserFactory<C> traverserFactory;
 
-    public InitialStep(final InitialFunction<C, S> initialFunction, final 
TraverserFactory<C, S> traverserFactory) {
+    public InitialStep(final InitialFunction<C, S> initialFunction, final 
TraverserFactory<C> traverserFactory) {
         super(EmptyStep.instance(), initialFunction);
         this.objects = initialFunction.get();
         this.traverserFactory = traverserFactory;
diff --git 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java
 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java
index 29f64bf..606e0ad 100644
--- 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java
+++ 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java
@@ -43,7 +43,7 @@ public class Pipes<C, S, E> implements Processor<C, S, E> {
     private Step<C, ?, E> endStep;
     private Step<C, S, ?> startStep = EmptyStep.instance();
 
-    public Pipes(final List<CFunction<C>> functions, final TraverserFactory<C, 
S> traverserFactory) {
+    public Pipes(final List<CFunction<C>> functions, final TraverserFactory<C> 
traverserFactory) {
         AbstractStep<C, ?, ?> previousStep = EmptyStep.instance();
         for (final CFunction<?> function : functions) {
             if (function instanceof NestedFunction)
diff --git 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/ReduceStep.java
 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/ReduceStep.java
index 574f2b9..19e4cbc 100644
--- 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/ReduceStep.java
+++ 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/ReduceStep.java
@@ -30,13 +30,13 @@ public class ReduceStep<C, S, E> extends AbstractStep<C, S, 
E> {
 
     private final ReduceFunction<C, S, E> reduceFunction;
     private final Reducer<E> reducer;
-    private final TraverserFactory<C, E> traverserFactory;
+    private final TraverserFactory<C> traverserFactory;
     private boolean done = false;
 
     public ReduceStep(final AbstractStep<C, ?, S> previousStep,
                       final ReduceFunction<C, S, E> reduceFunction,
                       final Reducer<E> reducer,
-                      final TraverserFactory<C, E> traverserFactory) {
+                      final TraverserFactory<C> traverserFactory) {
         super(previousStep, reduceFunction);
         this.reduceFunction = reduceFunction;
         this.reducer = reducer;

Reply via email to