This is an automated email from the ASF dual-hosted git repository.

okram pushed a commit to branch tp4
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git


The following commit(s) were added to refs/heads/tp4 by this push:
     new 955f2f1  minor nothings. calling it a day.
955f2f1 is described below

commit 955f2f14d1b0a21b3aa616e3b6c50dca827b662a
Author: Marko A. Rodriguez <[email protected]>
AuthorDate: Tue Mar 12 16:16:38 2019 -0600

    minor nothings. calling it a day.
---
 .../org/apache/tinkerpop/machine/beam/Beam.java    | 32 ++++++++++++----------
 .../apache/tinkerpop/machine/beam/BeamTest.java    |  4 ++-
 2 files changed, 20 insertions(+), 16 deletions(-)

diff --git 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java
index cf517e9..9888804 100644
--- 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java
+++ 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java
@@ -60,26 +60,27 @@ public class Beam<C, S, E> implements Processor<C, S, E> {
     private final List<Fn> functions = new ArrayList<>();
     private Iterator<Traverser<C, E>> iterator = null;
 
-    public Beam(final Compilation<C, S, E> compilation) {
+    private final TraverserCoder<C, S> coder = new TraverserCoder<>();
 
+    public Beam(final Compilation<C, S, E> compilation) {
         this.pipeline = Pipeline.create();
-        PCollection<Traverser<C, ?>> collection = 
this.pipeline.apply(Create.of(compilation.getTraverserFactory().create((Coefficient)
 LongCoefficient.create(), 1L)));
-        collection.setCoder(new TraverserCoder());
-        for (final CFunction<?> function : compilation.getFunctions()) {
+        PCollection<Traverser<C, S>> collection = 
this.pipeline.apply(Create.of(compilation.getTraverserFactory().create((Coefficient)
 LongCoefficient.create(), 1L)));
+        collection.setCoder(this.coder);
+        for (final CFunction<C> function : compilation.getFunctions()) {
             collection = processFunction(collection, 
compilation.getTraverserFactory(), function, false);
         }
-        collection.apply(ParDo.of(new OutputStep()));
+        collection.apply(ParDo.of(new OutputStep<>()));
         this.pipeline.getOptions().setRunner(new 
PipelineOptions.DirectRunner().create(this.pipeline.getOptions()));
     }
 
-    private PCollection<Traverser<C, ?>> processFunction(
-            PCollection<Traverser<C, ?>> collection,
+    private PCollection<Traverser<C, S>> processFunction(
+            PCollection<Traverser<C, S>> collection,
             final TraverserFactory<C> traverserFactory,
             final CFunction<?> function,
             final boolean branching) {
         DoFn<Traverser<C, S>, Traverser<C, E>> fn = null;
         if (function instanceof RepeatBranch) {
-            final Compilation<C, S, S> repeat = ((RepeatBranch) 
function).getRepeat();
+            ;
             final List<PCollection> outputs = new ArrayList<>();
             final TupleTag repeatDone = new TupleTag<>();
             final TupleTag repeatLoop = new TupleTag<>();
@@ -89,7 +90,7 @@ public class Beam<C, S, E> implements Processor<C, S, E> {
                 branches.get(repeatLoop).setCoder(new TraverserCoder());
                 branches.get(repeatDone).setCoder(new TraverserCoder());
                 outputs.add(branches.get(repeatDone));
-                for (final CFunction<C> repeatFunction : 
repeat.getFunctions()) {
+                for (final CFunction<C> repeatFunction : ((RepeatBranch<C, S>) 
function).getRepeat().getFunctions()) {
                     collection = 
this.processFunction(branches.get(repeatLoop), traverserFactory, 
repeatFunction, true);
                 }
             }
@@ -98,9 +99,9 @@ public class Beam<C, S, E> implements Processor<C, S, E> {
             collection.setCoder(new TraverserCoder());
         } else if (function instanceof BranchFunction) {
             final List<Compilation<C, ?, ?>> branches = ((BranchFunction<C, ?, 
?>) function).getInternals();
-            final List<PCollection<Traverser<C, ?>>> collections = new 
ArrayList<>(branches.size());
+            final List<PCollection<Traverser<C, S>>> collections = new 
ArrayList<>(branches.size());
             for (final Compilation<C, ?, ?> branch : branches) {
-                PCollection<Traverser<C, ?>> branchCollection = collection;
+                PCollection<Traverser<C, S>> branchCollection = collection;
                 for (final CFunction<C> branchFunction : 
branch.getFunctions()) {
                     branchCollection = this.processFunction(branchCollection, 
traverserFactory, branchFunction, true);
                 }
@@ -118,7 +119,7 @@ public class Beam<C, S, E> implements Processor<C, S, E> {
             fn = new MapFn<>((MapFunction<C, S, E>) function);
         } else if (function instanceof ReduceFunction) {
             final ReduceFn<C, S, E> combine = new 
ReduceFn<>((ReduceFunction<C, S, E>) function, traverserFactory);
-            collection = (PCollection<Traverser<C, ?>>) 
collection.apply(Combine.globally((ReduceFn) combine));
+            collection = (PCollection<Traverser<C, S>>) 
collection.apply(Combine.globally((ReduceFn) combine));
             this.functions.add(combine);
         } else
             throw new RuntimeException("You need a new step type:" + function);
@@ -126,9 +127,9 @@ public class Beam<C, S, E> implements Processor<C, S, E> {
         if (!(function instanceof ReduceFunction) && !(function instanceof 
BranchFunction)) {
             if (!branching)
                 this.functions.add((Fn) fn);
-            collection = (PCollection<Traverser<C, ?>>) 
collection.apply(ParDo.of((DoFn) fn));
+            collection = (PCollection<Traverser<C, S>>) 
collection.apply(ParDo.of((DoFn) fn));
         }
-        collection.setCoder(new TraverserCoder());
+        collection.setCoder(this.coder);
         return collection;
     }
 
@@ -151,7 +152,8 @@ public class Beam<C, S, E> implements Processor<C, S, E> {
 
     @Override
     public void reset() {
-
+        OUTPUT.clear();
+        this.iterator = null;
     }
 
     @Override
diff --git 
a/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java
 
b/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java
index 7323f74..364b740 100644
--- 
a/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java
+++ 
b/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java
@@ -26,6 +26,8 @@ import org.apache.tinkerpop.language.__;
 import org.apache.tinkerpop.machine.strategies.IdentityStrategy;
 import org.junit.jupiter.api.Test;
 
+import java.util.Arrays;
+
 import static org.apache.tinkerpop.language.__.incr;
 import static org.apache.tinkerpop.language.__.is;
 
@@ -40,7 +42,7 @@ public class BeamTest {
                 .withProcessor(BeamProcessor.class)
                 .withStrategy(IdentityStrategy.class);
 
-        Traversal<Long, ?, ?> traversal = 
g.inject(5L).repeat(incr()).until(is(10L));
+        Traversal<Long, ?, ?> traversal = g.inject(Arrays.asList(2L, 5L, 
10L)).<Long>unfold().repeat(__.<Long>incr().identity().map(__.incr())).until(is(10L));
         
System.out.println(TraversalUtil.getBytecode(traversal).getSourceInstructions());
         System.out.println(TraversalUtil.getBytecode(traversal));
         System.out.println(traversal);

Reply via email to