This is an automated email from the ASF dual-hosted git repository.

okram pushed a commit to branch tp4
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git


The following commit(s) were added to refs/heads/tp4 by this push:
     new d4e4503  add CoefficientStrategy which knows how to propagate the 
unity coefficient to all child bytecode. Also, it assumes LongCoefficient (95 
percent of cases) if no withCoefficient() is provided to the TraversalSource. 
Added our first by() modulation support to path()-step. CompilationRing is used 
in an analagous fashion to FunctionRing in TP3. Finally cleaned up 
ProcessorFactory to now take a Compilation. There is a hard line where Bytecode 
is no longer the currency of the ma [...]
d4e4503 is described below

commit d4e4503c342efd2e9fb4aff885b10f07bb823079
Author: Marko A. Rodriguez <[email protected]>
AuthorDate: Tue Mar 12 07:57:36 2019 -0600

    add CoefficientStrategy which knows how to propagate the unity coefficient 
to all child bytecode. Also, it assumes LongCoefficient (95 percent of cases) 
if no withCoefficient() is provided to the TraversalSource. Added our first 
by() modulation support to path()-step. CompilationRing is used in an analagous 
fashion to FunctionRing in TP3. Finally cleaned up ProcessorFactory to now take 
a Compilation. There is a hard line where Bytecode is no longer the currency of 
the machine, but a C [...]
---
 .../org/apache/tinkerpop/language/Traversal.java   |  4 +-
 .../apache/tinkerpop/language/TraversalSource.java |  2 +
 .../tinkerpop/machine/bytecode/BytecodeUtil.java   |  5 +--
 .../tinkerpop/machine/bytecode/Compilation.java    | 17 ++++++++-
 .../CompilationRing.java}                          | 44 +++++++++++++---------
 .../tinkerpop/machine/functions/map/PathMap.java   | 18 ++++++++-
 .../machine/processor/EmptyProcessorFactory.java   |  5 +--
 .../machine/processor/ProcessorFactory.java        | 11 +-----
 .../machine/strategies/CoefficientStrategy.java    | 21 ++++++++++-
 .../org/apache/tinkerpop/machine/beam/Beam.java    | 29 ++++++--------
 .../tinkerpop/machine/beam/BeamProcessor.java      |  7 ++--
 .../apache/tinkerpop/machine/beam/BeamTest.java    |  5 ++-
 .../org/apache/tinkerpop/machine/pipes/Pipes.java  | 11 +++---
 .../tinkerpop/machine/pipes/PipesProcessor.java    | 10 ++---
 .../apache/tinkerpop/machine/pipes/ReduceStep.java |  7 ++++
 .../tinkerpop/machine/pipes/util/BasicReducer.java |  7 ++++
 .../tinkerpop/machine/pipes/util/Reducer.java      |  2 +
 .../apache/tinkerpop/machine/pipes/PipesTest.java  |  2 +-
 18 files changed, 133 insertions(+), 74 deletions(-)

diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/language/Traversal.java 
b/java/core/src/main/java/org/apache/tinkerpop/language/Traversal.java
index e620314..92ae122 100644
--- a/java/core/src/main/java/org/apache/tinkerpop/language/Traversal.java
+++ b/java/core/src/main/java/org/apache/tinkerpop/language/Traversal.java
@@ -52,7 +52,7 @@ public class Traversal<C, S, E> implements Iterator<E> {
         return this;
     }
 
-    public Traversal<C, S, E> by(final Traversal<C, E, ?> byTraversal) {
+    public Traversal<C, S, E> by(final Traversal<C, ?, ?> byTraversal) {
         this.bytecode.lastInstruction().addArg(byTraversal);
         return this;
     }
@@ -120,7 +120,7 @@ public class Traversal<C, S, E> implements Iterator<E> {
 
     private final void prepareTraversal() {
         if (null == this.compilation)
-            this.compilation = Compilation.compile(bytecode);
+            this.compilation = Compilation.compile(this.bytecode);
     }
 
     @Override
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/language/TraversalSource.java 
b/java/core/src/main/java/org/apache/tinkerpop/language/TraversalSource.java
index 355ab61..e42c8dd 100644
--- a/java/core/src/main/java/org/apache/tinkerpop/language/TraversalSource.java
+++ b/java/core/src/main/java/org/apache/tinkerpop/language/TraversalSource.java
@@ -21,6 +21,7 @@ package org.apache.tinkerpop.language;
 import org.apache.tinkerpop.machine.bytecode.Bytecode;
 import org.apache.tinkerpop.machine.coefficients.Coefficient;
 import org.apache.tinkerpop.machine.processor.ProcessorFactory;
+import org.apache.tinkerpop.machine.strategies.CoefficientStrategy;
 import org.apache.tinkerpop.machine.strategies.Strategy;
 
 /**
@@ -32,6 +33,7 @@ public class TraversalSource<C> {
 
     protected TraversalSource() {
         this.bytecode = new Bytecode<>();
+        this.bytecode.addSourceInstruction(Symbols.WITH_STRATEGY, 
CoefficientStrategy.class);
     }
 
     public TraversalSource<C> withCoefficient(final Class<? extends 
Coefficient<C>> coefficient) {
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java
index 635e787..0f129f2 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java
@@ -47,11 +47,10 @@ import java.util.Set;
  */
 public final class BytecodeUtil {
 
-    public static <C> Bytecode<C> strategize(final Bytecode<C> bytecode) {
+    public static <C> void strategize(final Bytecode<C> bytecode) {
         for (final Strategy strategy : BytecodeUtil.getStrategies(bytecode)) {
             BytecodeUtil.strategize(bytecode, strategy);
         }
-        return bytecode;
     }
 
     private static <C> void strategize(final Bytecode<C> bytecode, Strategy 
strategy) {
@@ -147,7 +146,7 @@ public final class BytecodeUtil {
             case Symbols.MAP:
                 return new MapMap<>(coefficient, labels, 
Compilation.compileOne(instruction.args()[0]));
             case Symbols.PATH:
-                return new PathMap<>(coefficient, labels);
+                return new PathMap<>(coefficient, labels, 
Compilation.compile(instruction.args()));
             case Symbols.SUM:
                 return new SumReduce<>(coefficient, labels);
             case Symbols.UNION:
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/Compilation.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/Compilation.java
index b0ea933..19de363 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/Compilation.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/Compilation.java
@@ -18,6 +18,7 @@
  */
 package org.apache.tinkerpop.machine.bytecode;
 
+import org.apache.tinkerpop.machine.coefficients.Coefficient;
 import org.apache.tinkerpop.machine.functions.CFunction;
 import org.apache.tinkerpop.machine.processor.Processor;
 import org.apache.tinkerpop.machine.processor.ProcessorFactory;
@@ -37,14 +38,15 @@ public final class Compilation<C, S, E> implements 
Serializable {
     private final List<CFunction<C>> functions;
     private final ProcessorFactory processorFactory;
     private final TraverserFactory<C> traverserFactory;
+    private final Coefficient<C> unity;
     private transient Processor<C, S, E> processor;
 
     public Compilation(final Bytecode<C> bytecode) {
         BytecodeUtil.strategize(bytecode);
         this.processorFactory = 
BytecodeUtil.getProcessorFactory(bytecode).get();
         this.traverserFactory = 
BytecodeUtil.getTraverserFactory(bytecode).get();
+        this.unity = 
BytecodeUtil.getCoefficient(bytecode).get().clone().unity();
         this.functions = BytecodeUtil.compile(bytecode);
-
     }
 
     public Processor<C, S, E> getProcessor() {
@@ -59,7 +61,7 @@ public final class Compilation<C, S, E> implements 
Serializable {
 
     private void prepareProcessor() {
         if (null == this.processor)
-            this.processor = this.processorFactory.mint(this.traverserFactory, 
this.functions);
+            this.processor = this.processorFactory.mint(this);
     }
 
     public Traverser<C, E> mapTraverser(final Traverser<C, S> traverser) {
@@ -69,6 +71,13 @@ public final class Compilation<C, S, E> implements 
Serializable {
         return this.processor.next();
     }
 
+    public Traverser<C, E> mapObject(final S object) {
+        this.reset();
+        this.prepareProcessor();
+        this.processor.addStart(this.traverserFactory.create(this.unity, 
object));
+        return this.processor.next();
+    }
+
     public Iterator<Traverser<C, E>> flatMapTraverser(final Traverser<C, S> 
traverser) {
         this.reset();
         this.prepareProcessor();
@@ -92,6 +101,10 @@ public final class Compilation<C, S, E> implements 
Serializable {
         return this.functions;
     }
 
+    public TraverserFactory<C> getTraverserFactory() {
+        return this.traverserFactory;
+    }
+
     ////////
 
     public static <C, S, E> Compilation<C, S, E> compile(final Bytecode<C> 
bytecode) {
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/processor/EmptyProcessorFactory.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/CompilationRing.java
similarity index 50%
copy from 
java/core/src/main/java/org/apache/tinkerpop/machine/processor/EmptyProcessorFactory.java
copy to 
java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/CompilationRing.java
index ea86138..3d1c280 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/processor/EmptyProcessorFactory.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/CompilationRing.java
@@ -16,37 +16,47 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.tinkerpop.machine.processor;
+package org.apache.tinkerpop.machine.bytecode;
 
-import org.apache.tinkerpop.machine.functions.CFunction;
-import org.apache.tinkerpop.machine.strategies.Strategy;
-import org.apache.tinkerpop.machine.traversers.TraverserFactory;
-
-import java.util.Collections;
+import java.io.Serializable;
 import java.util.List;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class EmptyProcessorFactory implements ProcessorFactory {
+public class CompilationRing<C, S, E> implements Serializable {
 
-    private static final EmptyProcessorFactory INSTANCE = new 
EmptyProcessorFactory();
+    private List<Compilation<C, S, E>> compilations;
+    private int currentCompilation = -1;
 
-    private EmptyProcessorFactory() {
+    public CompilationRing(final List<Compilation<C, S, E>> compilations) {
+        this.compilations = compilations;
+    }
 
+    public Compilation<C, S, E> next() {
+        if (this.compilations.isEmpty()) {
+            return null;
+        } else {
+            this.currentCompilation = (this.currentCompilation + 1) % 
this.compilations.size();
+            return this.compilations.get(this.currentCompilation);
+        }
     }
 
-    @Override
-    public <C, S, E> Processor<C, S, E> mint(final TraverserFactory<C> 
traverserFactory, final List<CFunction<C>> functions) {
-        return EmptyProcessor.instance();
+    public boolean isEmpty() {
+        return this.compilations.isEmpty();
     }
 
-    @Override
-    public List<Strategy> getStrategies() {
-        return Collections.emptyList();
+    public void reset() {
+        this.currentCompilation = -1;
     }
 
-    public static EmptyProcessorFactory instance() {
-        return INSTANCE;
+    public int size() {
+        return this.compilations.size();
+    }
+
+
+    @Override
+    public String toString() {
+        return this.compilations.toString();
     }
 }
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/PathMap.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/PathMap.java
index 4c955d2..8e66ccd 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/PathMap.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/PathMap.java
@@ -18,12 +18,15 @@
  */
 package org.apache.tinkerpop.machine.functions.map;
 
+import org.apache.tinkerpop.machine.bytecode.Compilation;
+import org.apache.tinkerpop.machine.bytecode.CompilationRing;
 import org.apache.tinkerpop.machine.coefficients.Coefficient;
 import org.apache.tinkerpop.machine.functions.AbstractFunction;
 import org.apache.tinkerpop.machine.functions.MapFunction;
 import org.apache.tinkerpop.machine.traversers.Path;
 import org.apache.tinkerpop.machine.traversers.Traverser;
 
+import java.util.List;
 import java.util.Set;
 
 /**
@@ -31,12 +34,23 @@ import java.util.Set;
  */
 public class PathMap<C, S> extends AbstractFunction<C, S, Path> implements 
MapFunction<C, S, Path> {
 
-    public PathMap(final Coefficient<C> coefficient, final Set<String> labels) 
{
+    private final CompilationRing<C, Object, Object> compilationRing;
+
+    public PathMap(final Coefficient<C> coefficient, final Set<String> labels, 
final List<Compilation<C, Object, Object>> byMaps) {
         super(coefficient, labels);
+        this.compilationRing = new CompilationRing<>(byMaps);
     }
 
     @Override
     public Path apply(final Traverser<C, S> traverser) {
-        return traverser.path();
+        if (!this.compilationRing.isEmpty()) {
+            final Path oldPath = traverser.path();
+            final Path newPath = new Path();
+            for (int i = 0; i < oldPath.size(); i++) {
+                newPath.add(oldPath.labels(i), 
this.compilationRing.next().mapObject(oldPath.object(i)).object());
+            }
+            return newPath;
+        } else
+            return traverser.path();
     }
 }
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/processor/EmptyProcessorFactory.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/processor/EmptyProcessorFactory.java
index ea86138..454c343 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/processor/EmptyProcessorFactory.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/processor/EmptyProcessorFactory.java
@@ -18,9 +18,8 @@
  */
 package org.apache.tinkerpop.machine.processor;
 
-import org.apache.tinkerpop.machine.functions.CFunction;
+import org.apache.tinkerpop.machine.bytecode.Compilation;
 import org.apache.tinkerpop.machine.strategies.Strategy;
-import org.apache.tinkerpop.machine.traversers.TraverserFactory;
 
 import java.util.Collections;
 import java.util.List;
@@ -37,7 +36,7 @@ public final class EmptyProcessorFactory implements 
ProcessorFactory {
     }
 
     @Override
-    public <C, S, E> Processor<C, S, E> mint(final TraverserFactory<C> 
traverserFactory, final List<CFunction<C>> functions) {
+    public <C, S, E> Processor<C, S, E> mint(final Compilation<C, S, E> 
compilation) {
         return EmptyProcessor.instance();
     }
 
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/processor/ProcessorFactory.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/processor/ProcessorFactory.java
index 6ffa76e..ccfae9d 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/processor/ProcessorFactory.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/processor/ProcessorFactory.java
@@ -18,11 +18,8 @@
  */
 package org.apache.tinkerpop.machine.processor;
 
-import org.apache.tinkerpop.machine.bytecode.Bytecode;
-import org.apache.tinkerpop.machine.bytecode.BytecodeUtil;
-import org.apache.tinkerpop.machine.functions.CFunction;
+import org.apache.tinkerpop.machine.bytecode.Compilation;
 import org.apache.tinkerpop.machine.strategies.Strategy;
-import org.apache.tinkerpop.machine.traversers.TraverserFactory;
 
 import java.io.Serializable;
 import java.lang.reflect.InvocationTargetException;
@@ -33,11 +30,7 @@ import java.util.List;
  */
 public interface ProcessorFactory extends Serializable {
 
-    public default <C, S, E> Processor<C, S, E> mint(final Bytecode<C> 
bytecode) {
-        return this.mint(BytecodeUtil.getTraverserFactory(bytecode).get(), 
BytecodeUtil.compile(BytecodeUtil.strategize(bytecode)));
-    }
-
-    public <C, S, E> Processor<C, S, E> mint(final TraverserFactory<C> 
traverserFactory, final List<CFunction<C>> functions);
+    public <C, S, E> Processor<C, S, E> mint(final Compilation<C, S, E> 
compilation);
 
     public List<Strategy> getStrategies();
 
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/strategies/CoefficientStrategy.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/strategies/CoefficientStrategy.java
index 051ab39..9cf7ae6 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/strategies/CoefficientStrategy.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/strategies/CoefficientStrategy.java
@@ -18,7 +18,12 @@
  */
 package org.apache.tinkerpop.machine.strategies;
 
+import org.apache.tinkerpop.language.Symbols;
 import org.apache.tinkerpop.machine.bytecode.Bytecode;
+import org.apache.tinkerpop.machine.bytecode.BytecodeUtil;
+import org.apache.tinkerpop.machine.bytecode.Instruction;
+import org.apache.tinkerpop.machine.coefficients.Coefficient;
+import org.apache.tinkerpop.machine.coefficients.LongCoefficient;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -26,6 +31,20 @@ import org.apache.tinkerpop.machine.bytecode.Bytecode;
 public final class CoefficientStrategy implements Strategy {
     @Override
     public <C> void apply(Bytecode<C> bytecode) {
-        // todo: propagate root coefficient to all child bytecode;
+        Coefficient<C> coefficient = 
BytecodeUtil.getCoefficient(bytecode).orElse(null);
+        if (null == coefficient) {
+            coefficient = (Coefficient<C>) LongCoefficient.create();
+            bytecode.addSourceInstruction(Symbols.WITH_COEFFICIENT, 
coefficient.getClass());
+        }
+        for (final Instruction<C> instruction : bytecode.getInstructions()) {
+            for (final Object arg : instruction.args()) {
+                if (arg instanceof Bytecode) {
+                    final Bytecode<C> next = (Bytecode<C>) arg;
+                    if (!BytecodeUtil.hasSourceInstruction(next, 
Symbols.WITH_COEFFICIENT)) {
+                        next.addSourceInstruction(Symbols.WITH_COEFFICIENT, 
coefficient.getClass());
+                    }
+                }
+            }
+        }
     }
 }
diff --git 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java
index 2ad5cd4..eaed2f7 100644
--- 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java
+++ 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java
@@ -27,8 +27,6 @@ import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
-import org.apache.tinkerpop.machine.bytecode.Bytecode;
-import org.apache.tinkerpop.machine.bytecode.BytecodeUtil;
 import org.apache.tinkerpop.machine.bytecode.Compilation;
 import org.apache.tinkerpop.machine.coefficients.Coefficient;
 import org.apache.tinkerpop.machine.coefficients.LongCoefficient;
@@ -52,28 +50,29 @@ import java.util.List;
  */
 public class Beam<C, S, E> implements Processor<C, S, E> {
 
-
     private final Pipeline pipeline;
     public static List<Traverser> OUTPUT = new ArrayList<>(); // FIX THIS!
     private final List<Fn> functions = new ArrayList<>();
     Iterator<Traverser<C, E>> iterator = null;
-    private TraverserFactory<C> traverserFactory;
 
+    public Beam(final Compilation<C, S, E> compilation) {
 
-    public Beam(final TraverserFactory<C> traverserFactory, final 
List<CFunction<C>> functions) {
-        this.traverserFactory = traverserFactory;
         this.pipeline = Pipeline.create();
         
this.pipeline.getCoderRegistry().registerCoderForClass(Traverser.class, new 
TraverserCoder<>());
-        PCollection<Traverser<C, ?>> collection = 
this.pipeline.apply(Create.of(traverserFactory.create((Coefficient) 
LongCoefficient.create(), 1L)));
+        PCollection<Traverser<C, ?>> collection = 
this.pipeline.apply(Create.of(compilation.getTraverserFactory().create((Coefficient)
 LongCoefficient.create(), 1L)));
         collection.setCoder(new TraverserCoder());
-        for (final CFunction<?> function : functions) {
-            collection = processFunction(collection, function, false);
+        for (final CFunction<?> function : compilation.getFunctions()) {
+            collection = processFunction(collection, 
compilation.getTraverserFactory(), function, false);
         }
         collection.apply(ParDo.of(new OutputStep()));
         this.pipeline.getOptions().setRunner(new 
PipelineOptions.DirectRunner().create(this.pipeline.getOptions()));
     }
 
-    private PCollection<Traverser<C, ?>> 
processFunction(PCollection<Traverser<C, ?>> collection, final CFunction<?> 
function, final boolean branching) {
+    private PCollection<Traverser<C, ?>> processFunction(
+            PCollection<Traverser<C, ?>> collection,
+            final TraverserFactory<C> traverserFactory,
+            final CFunction<?> function,
+            final boolean branching) {
         DoFn<Traverser<C, S>, Traverser<C, E>> fn = null;
         if (function instanceof BranchFunction) {
             final List<Compilation<C, ?, ?>> branches = ((BranchFunction<C, ?, 
?>) function).getInternals();
@@ -81,14 +80,14 @@ public class Beam<C, S, E> implements Processor<C, S, E> {
             for (final Compilation<C, ?, ?> branch : branches) {
                 PCollection<Traverser<C, ?>> branchCollection = collection;
                 for (final CFunction<C> branchFunction : 
branch.getFunctions()) {
-                    branchCollection = this.processFunction(branchCollection, 
branchFunction, true);
+                    branchCollection = this.processFunction(branchCollection, 
traverserFactory, branchFunction, true);
                 }
                 collections.add(branchCollection);
             }
             collection = 
PCollectionList.of(collections).apply(Flatten.pCollections());
             this.functions.add(new BranchFn<>((BranchFunction<C, S, E>) 
function));
         } else if (function instanceof InitialFunction) {
-            fn = new InitialFn((InitialFunction<C, S>) function, 
this.traverserFactory);
+            fn = new InitialFn((InitialFunction<C, S>) function, 
traverserFactory);
         } else if (function instanceof FilterFunction) {
             fn = new FilterFn((FilterFunction<C, S>) function);
         } else if (function instanceof FlatMapFunction) {
@@ -96,7 +95,7 @@ public class Beam<C, S, E> implements Processor<C, S, E> {
         } else if (function instanceof MapFunction) {
             fn = new MapFn<>((MapFunction<C, S, E>) function);
         } else if (function instanceof ReduceFunction) {
-            final ReduceFn<C, S, E> combine = new 
ReduceFn<>((ReduceFunction<C, S, E>) function, this.traverserFactory);
+            final ReduceFn<C, S, E> combine = new 
ReduceFn<>((ReduceFunction<C, S, E>) function, traverserFactory);
             collection = (PCollection<Traverser<C, ?>>) 
collection.apply(Combine.globally((ReduceFn) combine));
             this.functions.add(combine);
         } else
@@ -111,10 +110,6 @@ public class Beam<C, S, E> implements Processor<C, S, E> {
         return collection;
     }
 
-    public Beam(final Bytecode<C> bytecode) {
-        this(BytecodeUtil.getTraverserFactory(bytecode).get(), 
BytecodeUtil.compile(BytecodeUtil.strategize(bytecode)));
-    }
-
     @Override
     public void addStart(final Traverser<C, S> traverser) {
         this.functions.get(0).addStart(traverser);
diff --git 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/BeamProcessor.java
 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/BeamProcessor.java
index 3094ac1..802c2e4 100644
--- 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/BeamProcessor.java
+++ 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/BeamProcessor.java
@@ -19,11 +19,10 @@
 package org.apache.tinkerpop.machine.beam;
 
 import org.apache.tinkerpop.machine.beam.strategies.BeamStrategy;
-import org.apache.tinkerpop.machine.functions.CFunction;
+import org.apache.tinkerpop.machine.bytecode.Compilation;
 import org.apache.tinkerpop.machine.processor.Processor;
 import org.apache.tinkerpop.machine.processor.ProcessorFactory;
 import org.apache.tinkerpop.machine.strategies.Strategy;
-import org.apache.tinkerpop.machine.traversers.TraverserFactory;
 
 import java.util.Collections;
 import java.util.List;
@@ -34,8 +33,8 @@ import java.util.List;
 public class BeamProcessor implements ProcessorFactory {
 
     @Override
-    public <C, S, E> Processor<C, S, E> mint(TraverserFactory<C> 
traverserFactory, List<CFunction<C>> cFunctions) {
-        return new Beam<>(traverserFactory, cFunctions);
+    public <C, S, E> Processor<C, S, E> mint(final Compilation<C, S, E> 
compilation) {
+        return new Beam<>(compilation);
     }
 
     @Override
diff --git 
a/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java
 
b/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java
index 2c126e3..f392ab7 100644
--- 
a/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java
+++ 
b/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java
@@ -34,11 +34,12 @@ public class BeamTest {
     @Test
     public void shouldWork() {
         final TraversalSource<Long> g = Gremlin.<Long>traversal()
-                .withCoefficient(LongCoefficient.class)
+                //.withCoefficient(LongCoefficient.class)
                 .withProcessor(BeamProcessor.class)
                 .withStrategy(IdentityStrategy.class);
 
-        Traversal<Long, Long, Long> traversal = g.inject(7L, 10L, 
12L).as("a").c(3L).map(__.incr()).identity().incr().sum();
+        Traversal<Long, Long, Long> traversal = g.inject(7L, 10L, 
12L).identity().incr().sum();
+        
System.out.println(TraversalUtil.getBytecode(traversal).getSourceInstructions());
         System.out.println(TraversalUtil.getBytecode(traversal));
         System.out.println(traversal);
         System.out.println(traversal.toList());
diff --git 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java
 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java
index 7087418..1d19e0c 100644
--- 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java
+++ 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java
@@ -18,18 +18,17 @@
  */
 package org.apache.tinkerpop.machine.pipes;
 
+import org.apache.tinkerpop.machine.bytecode.Compilation;
 import org.apache.tinkerpop.machine.functions.BranchFunction;
 import org.apache.tinkerpop.machine.functions.CFunction;
 import org.apache.tinkerpop.machine.functions.FilterFunction;
 import org.apache.tinkerpop.machine.functions.FlatMapFunction;
 import org.apache.tinkerpop.machine.functions.InitialFunction;
-import org.apache.tinkerpop.machine.functions.InternalFunction;
 import org.apache.tinkerpop.machine.functions.MapFunction;
 import org.apache.tinkerpop.machine.functions.ReduceFunction;
 import org.apache.tinkerpop.machine.pipes.util.BasicReducer;
 import org.apache.tinkerpop.machine.processor.Processor;
 import org.apache.tinkerpop.machine.traversers.Traverser;
-import org.apache.tinkerpop.machine.traversers.TraverserFactory;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -43,9 +42,9 @@ public class Pipes<C, S, E> implements Processor<C, S, E> {
     private Step<C, ?, E> endStep;
     private Step<C, S, ?> startStep = EmptyStep.instance();
 
-    public Pipes(final TraverserFactory<C> traverserFactory, final 
List<CFunction<C>> functions) {
+    public Pipes(final Compilation<C, S, E> compilation) {
         AbstractStep<C, ?, ?> previousStep = EmptyStep.instance();
-        for (final CFunction<?> function : functions) {
+        for (final CFunction<?> function : compilation.getFunctions()) {
             final AbstractStep nextStep;
             if (function instanceof BranchFunction)
                 nextStep = new BranchStep(previousStep, (BranchFunction<C, ?, 
?>) function);
@@ -56,9 +55,9 @@ public class Pipes<C, S, E> implements Processor<C, S, E> {
             else if (function instanceof MapFunction)
                 nextStep = new MapStep(previousStep, (MapFunction<C, ?, ?>) 
function);
             else if (function instanceof InitialFunction)
-                nextStep = new InitialStep((InitialFunction<C, S>) function, 
traverserFactory);
+                nextStep = new InitialStep((InitialFunction<C, S>) function, 
compilation.getTraverserFactory());
             else if (function instanceof ReduceFunction)
-                nextStep = new ReduceStep(previousStep, (ReduceFunction<C, ?, 
?>) function, new BasicReducer<>(((ReduceFunction<C, ?, ?>) 
function).getInitialValue()), traverserFactory);
+                nextStep = new ReduceStep(previousStep, (ReduceFunction<C, ?, 
?>) function, new BasicReducer<>(((ReduceFunction<C, ?, ?>) 
function).getInitialValue()), compilation.getTraverserFactory());
             else
                 throw new RuntimeException("You need a new step type:" + 
function);
 
diff --git 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/PipesProcessor.java
 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/PipesProcessor.java
index 137db3c..1ab1eb0 100644
--- 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/PipesProcessor.java
+++ 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/PipesProcessor.java
@@ -18,12 +18,11 @@
  */
 package org.apache.tinkerpop.machine.pipes;
 
-import org.apache.tinkerpop.machine.functions.CFunction;
+import org.apache.tinkerpop.machine.bytecode.Compilation;
 import org.apache.tinkerpop.machine.pipes.strategies.PipesStrategy;
 import org.apache.tinkerpop.machine.processor.Processor;
 import org.apache.tinkerpop.machine.processor.ProcessorFactory;
 import org.apache.tinkerpop.machine.strategies.Strategy;
-import org.apache.tinkerpop.machine.traversers.TraverserFactory;
 
 import java.util.Collections;
 import java.util.List;
@@ -33,11 +32,12 @@ import java.util.List;
  */
 public class PipesProcessor implements ProcessorFactory {
 
-    public PipesProcessor() {}
+    public PipesProcessor() {
+    }
 
     @Override
-    public <C, S, E> Processor<C, S, E> mint(final TraverserFactory<C> 
traverserFactory, final List<CFunction<C>> functions) {
-        return new Pipes<>(traverserFactory, functions);
+    public <C, S, E> Processor<C, S, E> mint(final Compilation<C, S, E> 
compilation) {
+        return new Pipes<>(compilation);
     }
 
     @Override
diff --git 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/ReduceStep.java
 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/ReduceStep.java
index 9d3a640..772d58b 100644
--- 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/ReduceStep.java
+++ 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/ReduceStep.java
@@ -60,4 +60,11 @@ public class ReduceStep<C, S, E> extends AbstractStep<C, S, 
E> {
     public boolean hasNext() {
         return !this.done && super.hasNext();
     }
+
+    @Override
+    public void reset() {
+        super.reset();
+        this.reducer.reset();
+        this.done = false;
+    }
 }
diff --git 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/util/BasicReducer.java
 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/util/BasicReducer.java
index e834c1e..8d7c1fd 100644
--- 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/util/BasicReducer.java
+++ 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/util/BasicReducer.java
@@ -24,11 +24,18 @@ package org.apache.tinkerpop.machine.pipes.util;
 public class BasicReducer<S> implements Reducer<S> {
 
     private S value;
+    private final S initialValue;
 
     public BasicReducer(final S initialValue) {
+        this.initialValue = initialValue;
         this.value = initialValue;
     }
 
+    @Override
+    public void reset() {
+        this.value = this.initialValue;
+    }
+
     public S get() {
         return this.value;
     }
diff --git 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/util/Reducer.java
 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/util/Reducer.java
index 44e680e..6bc5a89 100644
--- 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/util/Reducer.java
+++ 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/util/Reducer.java
@@ -25,6 +25,8 @@ import java.io.Serializable;
  */
 public interface Reducer<S> extends Serializable {
 
+    public void reset();
+
     public S get();
 
     public void update(final S newValue);
diff --git 
a/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java
 
b/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java
index a6caa8d..2275507 100644
--- 
a/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java
+++ 
b/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java
@@ -39,7 +39,7 @@ public class PipesTest {
                 .withProcessor(PipesProcessor.class)
                 .withStrategy(IdentityStrategy.class);
 
-        Traversal<Long, Long, ?> traversal = g.inject(7L, 10L, 
12L).identity().incr().incr().path();
+        Traversal<Long, Long, ?> traversal = g.inject(7L, 10L, 
12L).identity().incr().incr().path().by(__.count());
         System.out.println(TraversalUtil.getBytecode(traversal));
         System.out.println(traversal);
         System.out.println(traversal.toList());

Reply via email to