This is an automated email from the ASF dual-hosted git repository. okram pushed a commit to branch tp4 in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit d602c0706e0e430eacfc4761d61d6b2ddf7e2340 Author: Marko A. Rodriguez <okramma...@gmail.com> AuthorDate: Mon Mar 11 06:49:22 2019 -0600 minor changes. taking a break. --- .../java/org/apache/tinkerpop/language/Symbols.java | 5 +++++ .../java/org/apache/tinkerpop/machine/beam/Beam.java | 19 ++++++++++--------- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/java/core/src/main/java/org/apache/tinkerpop/language/Symbols.java b/java/core/src/main/java/org/apache/tinkerpop/language/Symbols.java index b302b88..9ea778d 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/language/Symbols.java +++ b/java/core/src/main/java/org/apache/tinkerpop/language/Symbols.java @@ -23,6 +23,11 @@ package org.apache.tinkerpop.language; */ public final class Symbols { + // SOURCE OPS + public static final String COEFFICIENT = "coefficient"; + + + // INSTRUCTION OPS public static final String AS = "as"; public static final String C = "c"; public static final String COUNT = "count"; diff --git a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java index b93174e..f2813b4 100644 --- a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java +++ b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java @@ -19,6 +19,7 @@ package org.apache.tinkerpop.machine.beam; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; @@ -44,17 +45,16 @@ import java.util.List; */ public class Beam<C, S, E> implements Processor<C, S, E> { - final Pipeline pipeline; - PCollection collection; - public static List<Traverser> OUTPUT = new ArrayList<>(); - Iterator<Traverser> iterator = null; + private final Pipeline pipeline; + public static List<Traverser> OUTPUT = new ArrayList<>(); // FIX THIS! private final List<DoFn> functions = new ArrayList<>(); + Iterator<Traverser> iterator = null; public Beam(final List<CFunction<C>> functions) { this.pipeline = Pipeline.create(); this.pipeline.getCoderRegistry().registerCoderForClass(Traverser.class, new TraverserCoder<>()); - this.collection = this.pipeline.apply(Create.of(new CompleteTraverser(LongCoefficient.create(), 1L))); - this.collection.setCoder(new TraverserCoder()); + PCollection collection = this.pipeline.apply(Create.of(new CompleteTraverser(LongCoefficient.create(), 1L))); + collection.setCoder(new TraverserCoder()); DoFn fn = null; for (final CFunction<?> function : functions) { @@ -69,10 +69,11 @@ public class Beam<C, S, E> implements Processor<C, S, E> { } else throw new RuntimeException("You need a new step type:" + function); this.functions.add(fn); - this.collection = (PCollection) collection.apply(ParDo.of(fn)); - this.collection.setCoder(new TraverserCoder()); + collection = (PCollection) collection.apply(ParDo.of(fn)); + collection.setCoder(new TraverserCoder()); } collection = (PCollection) collection.apply(ParDo.of(new OutputStep())); + this.pipeline.getOptions().setRunner(new PipelineOptions.DirectRunner().create(this.pipeline.getOptions())); } public Beam(final Bytecode<C> bytecode) { @@ -108,7 +109,7 @@ public class Beam<C, S, E> implements Processor<C, S, E> { private final void setupPipeline() { if (null == this.iterator) { - pipeline.run().waitUntilFinish(); + this.pipeline.run().waitUntilFinish(); this.iterator = new ArrayList<>(OUTPUT).iterator(); OUTPUT.clear(); }