This is an automated email from the ASF dual-hosted git repository.

okram pushed a commit to branch tp4
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit d602c0706e0e430eacfc4761d61d6b2ddf7e2340
Author: Marko A. Rodriguez <okramma...@gmail.com>
AuthorDate: Mon Mar 11 06:49:22 2019 -0600

    minor changes. taking a break.
---
 .../java/org/apache/tinkerpop/language/Symbols.java   |  5 +++++
 .../java/org/apache/tinkerpop/machine/beam/Beam.java  | 19 ++++++++++---------
 2 files changed, 15 insertions(+), 9 deletions(-)

diff --git a/java/core/src/main/java/org/apache/tinkerpop/language/Symbols.java 
b/java/core/src/main/java/org/apache/tinkerpop/language/Symbols.java
index b302b88..9ea778d 100644
--- a/java/core/src/main/java/org/apache/tinkerpop/language/Symbols.java
+++ b/java/core/src/main/java/org/apache/tinkerpop/language/Symbols.java
@@ -23,6 +23,11 @@ package org.apache.tinkerpop.language;
  */
 public final class Symbols {
 
+    // SOURCE OPS
+    public static final String COEFFICIENT = "coefficient";
+
+
+    // INSTRUCTION OPS
     public static final String AS = "as";
     public static final String C = "c";
     public static final String COUNT = "count";
diff --git 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java
index b93174e..f2813b4 100644
--- 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java
+++ 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java
@@ -19,6 +19,7 @@
 package org.apache.tinkerpop.machine.beam;
 
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -44,17 +45,16 @@ import java.util.List;
  */
 public class Beam<C, S, E> implements Processor<C, S, E> {
 
-    final Pipeline pipeline;
-    PCollection collection;
-    public static List<Traverser> OUTPUT = new ArrayList<>();
-    Iterator<Traverser> iterator = null;
+    private final Pipeline pipeline;
+    public static List<Traverser> OUTPUT = new ArrayList<>(); // FIX THIS!
     private final List<DoFn> functions = new ArrayList<>();
+    Iterator<Traverser> iterator = null;
 
     public Beam(final List<CFunction<C>> functions) {
         this.pipeline = Pipeline.create();
         
this.pipeline.getCoderRegistry().registerCoderForClass(Traverser.class, new 
TraverserCoder<>());
-        this.collection = this.pipeline.apply(Create.of(new 
CompleteTraverser(LongCoefficient.create(), 1L)));
-        this.collection.setCoder(new TraverserCoder());
+        PCollection collection = this.pipeline.apply(Create.of(new 
CompleteTraverser(LongCoefficient.create(), 1L)));
+        collection.setCoder(new TraverserCoder());
 
         DoFn fn = null;
         for (final CFunction<?> function : functions) {
@@ -69,10 +69,11 @@ public class Beam<C, S, E> implements Processor<C, S, E> {
             } else
                 throw new RuntimeException("You need a new step type:" + 
function);
             this.functions.add(fn);
-            this.collection = (PCollection) collection.apply(ParDo.of(fn));
-            this.collection.setCoder(new TraverserCoder());
+            collection = (PCollection) collection.apply(ParDo.of(fn));
+            collection.setCoder(new TraverserCoder());
         }
         collection = (PCollection) collection.apply(ParDo.of(new 
OutputStep()));
+        this.pipeline.getOptions().setRunner(new 
PipelineOptions.DirectRunner().create(this.pipeline.getOptions()));
     }
 
     public Beam(final Bytecode<C> bytecode) {
@@ -108,7 +109,7 @@ public class Beam<C, S, E> implements Processor<C, S, E> {
 
     private final void setupPipeline() {
         if (null == this.iterator) {
-            pipeline.run().waitUntilFinish();
+            this.pipeline.run().waitUntilFinish();
             this.iterator = new ArrayList<>(OUTPUT).iterator();
             OUTPUT.clear();
         }

Reply via email to