This is an automated email from the ASF dual-hosted git repository.

okram pushed a commit to branch tp4
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit 30a12669d04f2ecffca4a1b51b976eac88eca10f
Author: Marko A. Rodriguez <okramma...@gmail.com>
AuthorDate: Mon Mar 11 13:38:02 2019 -0600

    NestedFunctions.Branching are now split/merged PCollections in Beam. This 
is equivalent to GlobalChildren in TP3 where you want these individual nested 
traversals to be independent of one another. Starting to get the hang of the 
Apache Beam API.
---
 .../machine/functions/NestedFunction.java          |  7 ++
 .../machine/functions/flatMap/UnionFlatMap.java    |  2 +-
 .../tinkerpop/machine/functions/map/MapMap.java    |  2 +-
 .../org/apache/tinkerpop/machine/beam/Beam.java    | 83 +++++++++++++---------
 .../java/org/apache/tinkerpop/machine/beam/Fn.java |  4 +-
 .../apache/tinkerpop/machine/beam/BeamTest.java    |  2 +-
 6 files changed, 62 insertions(+), 38 deletions(-)

diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/NestedFunction.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/NestedFunction.java
index a63f226..2119f54 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/NestedFunction.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/NestedFunction.java
@@ -31,4 +31,11 @@ public interface NestedFunction<C, S, E> extends 
CFunction<C> {
     public void setProcessor(final TraverserFactory<C> traverserFactory, final 
ProcessorFactory processorFactory);
 
     public List<List<CFunction<C>>> getFunctions();
+
+    public interface Branching<C, S, E> extends NestedFunction<C, S, E> {
+    }
+
+    public interface Internal<C, S, E> extends NestedFunction<C, S, E> {
+
+    }
 }
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/flatMap/UnionFlatMap.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/flatMap/UnionFlatMap.java
index 7544580..cb5502a 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/flatMap/UnionFlatMap.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/flatMap/UnionFlatMap.java
@@ -39,7 +39,7 @@ import java.util.Set;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class UnionFlatMap<C, S, E> extends AbstractFunction<C, S, Iterator<E>> 
implements FlatMapFunction<C, S, E>, NestedFunction<C, S, E> {
+public class UnionFlatMap<C, S, E> extends AbstractFunction<C, S, Iterator<E>> 
implements FlatMapFunction<C, S, E>, NestedFunction.Branching<C, S, E> {
 
     private final List<List<CFunction<C>>> branchFunctions;
     private transient List<Processor<C, S, E>> processors;
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/MapMap.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/MapMap.java
index 08d4961..63d64c0 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/MapMap.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/MapMap.java
@@ -36,7 +36,7 @@ import java.util.Set;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class MapMap<C, S, E> extends AbstractFunction<C, S, E> implements 
MapFunction<C, S, E>, NestedFunction<C, S, E> {
+public class MapMap<C, S, E> extends AbstractFunction<C, S, E> implements 
MapFunction<C, S, E>, NestedFunction.Internal<C, S, E> {
 
     private final List<CFunction<C>> mapFunctions;
     private TraverserFactory<C> traverserFactory;
diff --git 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java
index fa06730..6531693 100644
--- 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java
+++ 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java
@@ -23,8 +23,10 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.tinkerpop.machine.bytecode.Bytecode;
 import org.apache.tinkerpop.machine.bytecode.BytecodeUtil;
 import org.apache.tinkerpop.machine.coefficients.Coefficient;
@@ -54,54 +56,71 @@ public class Beam<C, S, E> implements Processor<C, S, E> {
     private final Pipeline pipeline;
     public static List<Traverser> OUTPUT = new ArrayList<>(); // FIX THIS!
     private final List<Fn> functions = new ArrayList<>();
-    Iterator<Traverser> iterator = null;
+    Iterator<Traverser<C, E>> iterator = null;
+    private TraverserFactory<C> traverserFactory;
 
 
     public Beam(final TraverserFactory<C> traverserFactory, final 
List<CFunction<C>> functions) {
+        this.traverserFactory = traverserFactory;
         this.pipeline = Pipeline.create();
         
this.pipeline.getCoderRegistry().registerCoderForClass(Traverser.class, new 
TraverserCoder<>());
-        PCollection collection = 
this.pipeline.apply(Create.of(traverserFactory.create((Coefficient) 
LongCoefficient.create(), 1L)));
+        PCollection<Traverser<C, ?>> collection = 
this.pipeline.apply(Create.of(traverserFactory.create((Coefficient) 
LongCoefficient.create(), 1L)));
         collection.setCoder(new TraverserCoder());
-
-        DoFn fn = null;
         for (final CFunction<?> function : functions) {
-            if (function instanceof NestedFunction)
-                ((NestedFunction<C, ?, ?>) 
function).setProcessor(traverserFactory, new PipesProcessor());
-
-            if (function instanceof InitialFunction) {
-                fn = new InitialFn<>((InitialFunction) function, 
traverserFactory);
-            } else if (function instanceof FilterFunction) {
-                fn = new FilterFn<>((FilterFunction) function);
-            } else if (function instanceof FlatMapFunction) {
-                fn = new FlatMapFn<>((FlatMapFunction) function);
-            } else if (function instanceof MapFunction) {
-                fn = new MapFn<>((MapFunction) function);
-            } else if (function instanceof ReduceFunction) {
-                final ReduceFn combine = new ReduceFn<>((ReduceFunction) 
function, traverserFactory);
-                collection = (PCollection) 
collection.apply(Combine.globally(combine));
-                this.functions.add(combine);
-            } else
-                throw new RuntimeException("You need a new step type:" + 
function);
-
-            if (!(function instanceof ReduceFunction)) {
-                this.functions.add((Fn) fn);
-                collection = (PCollection) collection.apply(ParDo.of(fn));
-            }
-            collection.setCoder(new TraverserCoder());
-
-
+            collection = processFunction(collection, function, false);
         }
         collection.apply(ParDo.of(new OutputStep()));
         this.pipeline.getOptions().setRunner(new 
PipelineOptions.DirectRunner().create(this.pipeline.getOptions()));
     }
 
+    private PCollection<Traverser<C, ?>> 
processFunction(PCollection<Traverser<C, ?>> collection, final CFunction<?> 
function, final boolean branching) {
+        DoFn<Traverser<C, S>, Traverser<C, E>> fn = null;
+        if (function instanceof NestedFunction.Internal)
+            ((NestedFunction<C, ?, ?>) 
function).setProcessor(this.traverserFactory, new PipesProcessor());
+
+        if (function instanceof NestedFunction.Branching) {
+            final List<List<CFunction<C>>> branches = 
((NestedFunction.Branching) function).getFunctions();
+            final List<PCollection<Traverser<C, ?>>> collections = new 
ArrayList<>(branches.size());
+            for (final List<CFunction<C>> branch : branches) {
+                PCollection<Traverser<C, ?>> branchCollection = collection;
+                for (final CFunction<C> branchFunction : branch) {
+                    branchCollection = this.processFunction(branchCollection, 
branchFunction, true);
+                }
+                collections.add(branchCollection);
+            }
+            collection = 
PCollectionList.of(collections).apply(Flatten.pCollections());
+            this.functions.add(new FlatMapFn<>((FlatMapFunction<C, S, E>) 
function));
+        } else if (function instanceof InitialFunction) {
+            fn = new InitialFn((InitialFunction<C, S>) function, 
this.traverserFactory);
+        } else if (function instanceof FilterFunction) {
+            fn = new FilterFn((FilterFunction<C, S>) function);
+        } else if (function instanceof FlatMapFunction) {
+            fn = new FlatMapFn<>((FlatMapFunction<C, S, E>) function);
+        } else if (function instanceof MapFunction) {
+            fn = new MapFn<>((MapFunction<C, S, E>) function);
+        } else if (function instanceof ReduceFunction) {
+            final ReduceFn<C, S, E> combine = new 
ReduceFn<>((ReduceFunction<C, S, E>) function, this.traverserFactory);
+            collection = (PCollection<Traverser<C, ?>>) 
collection.apply(Combine.globally((ReduceFn) combine));
+            this.functions.add(combine);
+        } else
+            throw new RuntimeException("You need a new step type:" + function);
+
+        if (!(function instanceof ReduceFunction) && !(function instanceof 
NestedFunction.Branching)) {
+            if (!branching)
+                this.functions.add((Fn) fn);
+            collection = (PCollection<Traverser<C, ?>>) 
collection.apply(ParDo.of((DoFn) fn));
+        }
+        collection.setCoder(new TraverserCoder());
+        return collection;
+    }
+
     public Beam(final Bytecode<C> bytecode) {
         this(BytecodeUtil.getTraverserFactory(bytecode).get(), 
BytecodeUtil.compile(BytecodeUtil.strategize(bytecode)));
     }
 
     @Override
-    public void addStart(Traverser<C, S> traverser) {
-        ((Fn) this.functions.get(0)).addStart(traverser);
+    public void addStart(final Traverser<C, S> traverser) {
+        this.functions.get(0).addStart(traverser);
     }
 
     @Override
@@ -129,7 +148,7 @@ public class Beam<C, S, E> implements Processor<C, S, E> {
     private final void setupPipeline() {
         if (null == this.iterator) {
             this.pipeline.run().waitUntilFinish();
-            this.iterator = new ArrayList<>(OUTPUT).iterator();
+            this.iterator = (Iterator) new ArrayList<>(OUTPUT).iterator();
             OUTPUT.clear();
         }
     }
diff --git 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Fn.java 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Fn.java
index ca0a357..fc84fae 100644
--- a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Fn.java
+++ b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Fn.java
@@ -20,12 +20,10 @@ package org.apache.tinkerpop.machine.beam;
 
 import org.apache.tinkerpop.machine.traversers.Traverser;
 
-import java.io.Serializable;
-
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public interface Fn<C, S,E> {
+public interface Fn<C, S, E> {
 
     public void addStart(final Traverser<C, S> traverser);
 
diff --git 
a/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java
 
b/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java
index 5a5e020..268d75e 100644
--- 
a/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java
+++ 
b/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java
@@ -48,7 +48,7 @@ public class BeamTest {
         System.out.println(traversal);
         System.out.println(traversal.toList());
         System.out.println("\n----------\n");
-        traversal = g.inject(7L).union(__.<Long>incr().incr(),__.incr());
+        traversal = 
g.inject(7L).union(__.<Long>incr().incr().union(__.incr(),__.incr()),__.incr());
         System.out.println(TraversalUtil.getBytecode(traversal));
         System.out.println(traversal);
         System.out.println(traversal.toList());

Reply via email to