This is an automated email from the ASF dual-hosted git repository.
okram pushed a commit to branch tp4
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
The following commit(s) were added to refs/heads/tp4 by this push:
new 955f2f1 minor nothings. calling it a day.
955f2f1 is described below
commit 955f2f14d1b0a21b3aa616e3b6c50dca827b662a
Author: Marko A. Rodriguez <[email protected]>
AuthorDate: Tue Mar 12 16:16:38 2019 -0600
minor nothings. calling it a day.
---
.../org/apache/tinkerpop/machine/beam/Beam.java | 32 ++++++++++++----------
.../apache/tinkerpop/machine/beam/BeamTest.java | 4 ++-
2 files changed, 20 insertions(+), 16 deletions(-)
diff --git
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java
index cf517e9..9888804 100644
---
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java
+++
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java
@@ -60,26 +60,27 @@ public class Beam<C, S, E> implements Processor<C, S, E> {
private final List<Fn> functions = new ArrayList<>();
private Iterator<Traverser<C, E>> iterator = null;
- public Beam(final Compilation<C, S, E> compilation) {
+ private final TraverserCoder<C, S> coder = new TraverserCoder<>();
+ public Beam(final Compilation<C, S, E> compilation) {
this.pipeline = Pipeline.create();
- PCollection<Traverser<C, ?>> collection =
this.pipeline.apply(Create.of(compilation.getTraverserFactory().create((Coefficient)
LongCoefficient.create(), 1L)));
- collection.setCoder(new TraverserCoder());
- for (final CFunction<?> function : compilation.getFunctions()) {
+ PCollection<Traverser<C, S>> collection =
this.pipeline.apply(Create.of(compilation.getTraverserFactory().create((Coefficient)
LongCoefficient.create(), 1L)));
+ collection.setCoder(this.coder);
+ for (final CFunction<C> function : compilation.getFunctions()) {
collection = processFunction(collection,
compilation.getTraverserFactory(), function, false);
}
- collection.apply(ParDo.of(new OutputStep()));
+ collection.apply(ParDo.of(new OutputStep<>()));
this.pipeline.getOptions().setRunner(new
PipelineOptions.DirectRunner().create(this.pipeline.getOptions()));
}
- private PCollection<Traverser<C, ?>> processFunction(
- PCollection<Traverser<C, ?>> collection,
+ private PCollection<Traverser<C, S>> processFunction(
+ PCollection<Traverser<C, S>> collection,
final TraverserFactory<C> traverserFactory,
final CFunction<?> function,
final boolean branching) {
DoFn<Traverser<C, S>, Traverser<C, E>> fn = null;
if (function instanceof RepeatBranch) {
- final Compilation<C, S, S> repeat = ((RepeatBranch)
function).getRepeat();
+ ;
final List<PCollection> outputs = new ArrayList<>();
final TupleTag repeatDone = new TupleTag<>();
final TupleTag repeatLoop = new TupleTag<>();
@@ -89,7 +90,7 @@ public class Beam<C, S, E> implements Processor<C, S, E> {
branches.get(repeatLoop).setCoder(new TraverserCoder());
branches.get(repeatDone).setCoder(new TraverserCoder());
outputs.add(branches.get(repeatDone));
- for (final CFunction<C> repeatFunction :
repeat.getFunctions()) {
+ for (final CFunction<C> repeatFunction : ((RepeatBranch<C, S>)
function).getRepeat().getFunctions()) {
collection =
this.processFunction(branches.get(repeatLoop), traverserFactory,
repeatFunction, true);
}
}
@@ -98,9 +99,9 @@ public class Beam<C, S, E> implements Processor<C, S, E> {
collection.setCoder(new TraverserCoder());
} else if (function instanceof BranchFunction) {
final List<Compilation<C, ?, ?>> branches = ((BranchFunction<C, ?,
?>) function).getInternals();
- final List<PCollection<Traverser<C, ?>>> collections = new
ArrayList<>(branches.size());
+ final List<PCollection<Traverser<C, S>>> collections = new
ArrayList<>(branches.size());
for (final Compilation<C, ?, ?> branch : branches) {
- PCollection<Traverser<C, ?>> branchCollection = collection;
+ PCollection<Traverser<C, S>> branchCollection = collection;
for (final CFunction<C> branchFunction :
branch.getFunctions()) {
branchCollection = this.processFunction(branchCollection,
traverserFactory, branchFunction, true);
}
@@ -118,7 +119,7 @@ public class Beam<C, S, E> implements Processor<C, S, E> {
fn = new MapFn<>((MapFunction<C, S, E>) function);
} else if (function instanceof ReduceFunction) {
final ReduceFn<C, S, E> combine = new
ReduceFn<>((ReduceFunction<C, S, E>) function, traverserFactory);
- collection = (PCollection<Traverser<C, ?>>)
collection.apply(Combine.globally((ReduceFn) combine));
+ collection = (PCollection<Traverser<C, S>>)
collection.apply(Combine.globally((ReduceFn) combine));
this.functions.add(combine);
} else
throw new RuntimeException("You need a new step type:" + function);
@@ -126,9 +127,9 @@ public class Beam<C, S, E> implements Processor<C, S, E> {
if (!(function instanceof ReduceFunction) && !(function instanceof
BranchFunction)) {
if (!branching)
this.functions.add((Fn) fn);
- collection = (PCollection<Traverser<C, ?>>)
collection.apply(ParDo.of((DoFn) fn));
+ collection = (PCollection<Traverser<C, S>>)
collection.apply(ParDo.of((DoFn) fn));
}
- collection.setCoder(new TraverserCoder());
+ collection.setCoder(this.coder);
return collection;
}
@@ -151,7 +152,8 @@ public class Beam<C, S, E> implements Processor<C, S, E> {
@Override
public void reset() {
-
+ OUTPUT.clear();
+ this.iterator = null;
}
@Override
diff --git
a/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java
b/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java
index 7323f74..364b740 100644
---
a/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java
+++
b/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java
@@ -26,6 +26,8 @@ import org.apache.tinkerpop.language.__;
import org.apache.tinkerpop.machine.strategies.IdentityStrategy;
import org.junit.jupiter.api.Test;
+import java.util.Arrays;
+
import static org.apache.tinkerpop.language.__.incr;
import static org.apache.tinkerpop.language.__.is;
@@ -40,7 +42,7 @@ public class BeamTest {
.withProcessor(BeamProcessor.class)
.withStrategy(IdentityStrategy.class);
- Traversal<Long, ?, ?> traversal =
g.inject(5L).repeat(incr()).until(is(10L));
+ Traversal<Long, ?, ?> traversal = g.inject(Arrays.asList(2L, 5L,
10L)).<Long>unfold().repeat(__.<Long>incr().identity().map(__.incr())).until(is(10L));
System.out.println(TraversalUtil.getBytecode(traversal).getSourceInstructions());
System.out.println(TraversalUtil.getBytecode(traversal));
System.out.println(traversal);