This is an automated email from the ASF dual-hosted git repository.

okram pushed a commit to branch tp4
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit e21924fcd2cba64e1e8c88e70c092c72cfd053c4
Author: Marko A. Rodriguez <[email protected]>
AuthorDate: Tue Mar 12 15:28:59 2019 -0600

    repeat() implemented in Apache Beam. Its not pretty, but it works. I'm not 
that confident with the Beam API. Once I fully grock it, I will do a massive 
cleanup of the Beam compiler.
---
 .../org/apache/tinkerpop/language/Symbols.java     |  3 +
 .../org/apache/tinkerpop/language/Traversal.java   |  5 ++
 .../tinkerpop/machine/bytecode/BytecodeUtil.java   |  3 +
 .../machine/functions/branch/RepeatBranch.java     | 18 ++++--
 .../machine/functions/flatmap/UnfoldFlatMap.java   | 68 ++++++++++++++++++++++
 .../org/apache/tinkerpop/machine/beam/Beam.java    | 27 +++++++--
 .../apache/tinkerpop/machine/beam/BranchFn.java    |  2 +-
 .../machine/beam/{BranchFn.java => RepeatFn.java}  | 28 ++++++---
 .../apache/tinkerpop/machine/beam/BeamTest.java    | 10 ++--
 .../apache/tinkerpop/machine/pipes/PipesTest.java  |  4 +-
 10 files changed, 147 insertions(+), 21 deletions(-)

diff --git a/java/core/src/main/java/org/apache/tinkerpop/language/Symbols.java 
b/java/core/src/main/java/org/apache/tinkerpop/language/Symbols.java
index 8b4ebef..a11321f 100644
--- a/java/core/src/main/java/org/apache/tinkerpop/language/Symbols.java
+++ b/java/core/src/main/java/org/apache/tinkerpop/language/Symbols.java
@@ -47,6 +47,7 @@ public final class Symbols {
     public static final String PATH = "path";
     public static final String REPEAT = "repeat";
     public static final String SUM = "sum";
+    public static final String UNFOLD = "unfold";
     public static final String UNION = "union";
 
     public Type getOpType(final String op) {
@@ -77,6 +78,8 @@ public final class Symbols {
                 return Type.BRANCH;
             case SUM:
                 return Type.REDUCE;
+            case UNFOLD:
+                return Type.FLATMAP;
             case UNION:
                 return Type.BRANCH;
             default:
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/language/Traversal.java 
b/java/core/src/main/java/org/apache/tinkerpop/language/Traversal.java
index cf90d24..fdcc8d4 100644
--- a/java/core/src/main/java/org/apache/tinkerpop/language/Traversal.java
+++ b/java/core/src/main/java/org/apache/tinkerpop/language/Traversal.java
@@ -153,6 +153,11 @@ public class Traversal<C, S, E> implements Iterator<E> {
         return (Traversal) this;
     }
 
+    public <R> Traversal<C, S, R> unfold() {
+        this.bytecode.addInstruction(this.currentCoefficient, Symbols.UNFOLD);
+        return (Traversal) this;
+    }
+
     public <R> Traversal<C, S, R> union(final Traversal<C, E, R> traversal, 
Traversal<C, E, R>... traversals) {
         this.bytecode.addInstruction(this.currentCoefficient, Symbols.UNION, 
traversal, traversals);
         return (Traversal) this;
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java
index 42922b7..eea1df5 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java
@@ -28,6 +28,7 @@ import 
org.apache.tinkerpop.machine.functions.filter.HasKeyFilter;
 import org.apache.tinkerpop.machine.functions.filter.HasKeyValueFilter;
 import org.apache.tinkerpop.machine.functions.filter.IdentityFilter;
 import org.apache.tinkerpop.machine.functions.filter.IsFilter;
+import org.apache.tinkerpop.machine.functions.flatmap.UnfoldFlatMap;
 import org.apache.tinkerpop.machine.functions.initial.InjectInitial;
 import org.apache.tinkerpop.machine.functions.map.IncrMap;
 import org.apache.tinkerpop.machine.functions.map.MapMap;
@@ -161,6 +162,8 @@ public final class BytecodeUtil {
                 return new RepeatBranch(coefficient, labels, 
Compilation.compile(instruction.args()));
             case Symbols.SUM:
                 return new SumReduce<>(coefficient, labels);
+            case Symbols.UNFOLD:
+                return new UnfoldFlatMap<>(coefficient, labels);
             case Symbols.UNION:
                 return new UnionBranch<>(coefficient, labels, 
Compilation.compile(instruction.args()));
             default:
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/RepeatBranch.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/RepeatBranch.java
index f8ec403..c31b7c1 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/RepeatBranch.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/RepeatBranch.java
@@ -22,6 +22,7 @@ import org.apache.tinkerpop.machine.bytecode.Compilation;
 import org.apache.tinkerpop.machine.coefficients.Coefficient;
 import org.apache.tinkerpop.machine.functions.AbstractFunction;
 import org.apache.tinkerpop.machine.functions.BranchFunction;
+import org.apache.tinkerpop.machine.processor.Processor;
 import org.apache.tinkerpop.machine.traversers.Traverser;
 import org.apache.tinkerpop.util.IteratorUtils;
 import org.apache.tinkerpop.util.StringFactory;
@@ -47,14 +48,14 @@ public class RepeatBranch<C, S> extends AbstractFunction<C, 
S, Iterator<Traverse
 
     @Override
     public Iterator<Traverser<C, S>> apply(final Traverser<C, S> traverser) {
-        this.repeat.getProcessor().addStart(traverser);
-        return IteratorUtils.filter(this.repeat.getProcessor(), t -> {
+        final Processor<C, S, S> repeatProcessor = this.repeat.getProcessor();
+        repeatProcessor.addStart(traverser);
+        return IteratorUtils.filter(repeatProcessor, t -> {
             if (!this.until.filterTraverser(t)) {
-                this.repeat.getProcessor().addStart(t);
+                repeatProcessor.addStart(t);
                 return false;
             } else
                 return true;
-
         });
     }
 
@@ -67,4 +68,13 @@ public class RepeatBranch<C, S> extends AbstractFunction<C, 
S, Iterator<Traverse
     public List<Compilation<C, ?, ?>> getInternals() {
         return Arrays.asList(this.repeat, this.until);
     }
+
+    public Compilation<C, S, S> getRepeat() {
+        return this.repeat;
+    }
+
+    public Compilation<C, S, ?> getUntil() {
+        return this.until;
+    }
+
 }
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/flatmap/UnfoldFlatMap.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/flatmap/UnfoldFlatMap.java
new file mode 100644
index 0000000..807ae39
--- /dev/null
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/flatmap/UnfoldFlatMap.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.machine.functions.flatmap;
+
+import org.apache.tinkerpop.machine.coefficients.Coefficient;
+import org.apache.tinkerpop.machine.functions.AbstractFunction;
+import org.apache.tinkerpop.machine.functions.FlatMapFunction;
+import org.apache.tinkerpop.machine.traversers.Traverser;
+import org.apache.tinkerpop.util.ArrayIterator;
+import org.apache.tinkerpop.util.IteratorUtils;
+
+import java.lang.reflect.Array;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class UnfoldFlatMap<C, S, E> extends AbstractFunction<C, S, E> 
implements FlatMapFunction<C, S, E> {
+
+    public UnfoldFlatMap(final Coefficient<C> coefficient, final Set<String> 
labels) {
+        super(coefficient, labels);
+    }
+
+    @Override
+    public Iterator<E> apply(final Traverser<C, S> traverser) {
+        final S object = traverser.object();
+        if (object instanceof Iterator)
+            return (Iterator<E>) object;
+        else if (object instanceof Iterable)
+            return ((Iterable<E>) object).iterator();
+        else if (object instanceof Map)
+            return ((Map) object).entrySet().iterator();
+        else if (object.getClass().isArray())
+            return handleArrays(object);
+        else
+            return IteratorUtils.of((E) object);
+    }
+
+    private final Iterator<E> handleArrays(final Object array) {
+        if (array instanceof Object[]) {
+            return new ArrayIterator<>((E[]) array);
+        } else {
+            int len = Array.getLength(array);
+            final Object[] objectArray = new Object[len];
+            for (int i = 0; i < len; i++)
+                objectArray[i] = Array.get(array, i);
+            return new ArrayIterator<>((E[]) objectArray);
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java
index 166f44b..cf517e9 100644
--- 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java
+++ 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java
@@ -27,9 +27,9 @@ import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
-import org.apache.beam.sdk.values.TypeDescriptor;
-import org.apache.tinkerpop.machine.beam.serialization.CoefficientCoder;
-import org.apache.tinkerpop.machine.beam.serialization.ObjectCoder;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
 import org.apache.tinkerpop.machine.beam.serialization.TraverserCoder;
 import org.apache.tinkerpop.machine.bytecode.Compilation;
 import org.apache.tinkerpop.machine.coefficients.Coefficient;
@@ -41,6 +41,7 @@ import org.apache.tinkerpop.machine.functions.FlatMapFunction;
 import org.apache.tinkerpop.machine.functions.InitialFunction;
 import org.apache.tinkerpop.machine.functions.MapFunction;
 import org.apache.tinkerpop.machine.functions.ReduceFunction;
+import org.apache.tinkerpop.machine.functions.branch.RepeatBranch;
 import org.apache.tinkerpop.machine.processor.Processor;
 import org.apache.tinkerpop.machine.traversers.Traverser;
 import org.apache.tinkerpop.machine.traversers.TraverserFactory;
@@ -77,7 +78,25 @@ public class Beam<C, S, E> implements Processor<C, S, E> {
             final CFunction<?> function,
             final boolean branching) {
         DoFn<Traverser<C, S>, Traverser<C, E>> fn = null;
-        if (function instanceof BranchFunction) {
+        if (function instanceof RepeatBranch) {
+            final Compilation<C, S, S> repeat = ((RepeatBranch) 
function).getRepeat();
+            final List<PCollection> outputs = new ArrayList<>();
+            final TupleTag repeatDone = new TupleTag<>();
+            final TupleTag repeatLoop = new TupleTag<>();
+            for (int i = 0; i < 10; i++) {
+                fn = new RepeatFn((RepeatBranch) function, repeatDone, 
repeatLoop);
+                PCollectionTuple branches = (PCollectionTuple) 
collection.apply(ParDo.of(fn).withOutputTags(repeatLoop, 
TupleTagList.of(repeatDone)));
+                branches.get(repeatLoop).setCoder(new TraverserCoder());
+                branches.get(repeatDone).setCoder(new TraverserCoder());
+                outputs.add(branches.get(repeatDone));
+                for (final CFunction<C> repeatFunction : 
repeat.getFunctions()) {
+                    collection = 
this.processFunction(branches.get(repeatLoop), traverserFactory, 
repeatFunction, true);
+                }
+            }
+            this.functions.add((Fn) fn);
+            collection = (PCollection) PCollectionList.of((Iterable) 
outputs).apply(Flatten.pCollections());
+            collection.setCoder(new TraverserCoder());
+        } else if (function instanceof BranchFunction) {
             final List<Compilation<C, ?, ?>> branches = ((BranchFunction<C, ?, 
?>) function).getInternals();
             final List<PCollection<Traverser<C, ?>>> collections = new 
ArrayList<>(branches.size());
             for (final Compilation<C, ?, ?> branch : branches) {
diff --git 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/BranchFn.java
 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/BranchFn.java
index e1f9442..65e8cae 100644
--- 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/BranchFn.java
+++ 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/BranchFn.java
@@ -32,6 +32,6 @@ public class BranchFn<C, S, E> extends AbstractFn<C, S, E> {
 
     @ProcessElement
     public void processElement(final @Element Traverser<C, S> traverser, final 
OutputReceiver<Traverser<C, E>> output) {
-        throw new IllegalStateException("Branching is implementing using 
split/merge streams in Beam");
+        throw new IllegalStateException("Branching is implemented using 
split/merge streams in Beam");
     }
 }
\ No newline at end of file
diff --git 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/BranchFn.java
 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/RepeatFn.java
similarity index 51%
copy from 
java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/BranchFn.java
copy to 
java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/RepeatFn.java
index e1f9442..8edb581 100644
--- 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/BranchFn.java
+++ 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/RepeatFn.java
@@ -18,20 +18,34 @@
  */
 package org.apache.tinkerpop.machine.beam;
 
-import org.apache.tinkerpop.machine.functions.BranchFunction;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.tinkerpop.machine.bytecode.Compilation;
+import org.apache.tinkerpop.machine.functions.branch.RepeatBranch;
 import org.apache.tinkerpop.machine.traversers.Traverser;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class BranchFn<C, S, E> extends AbstractFn<C, S, E> {
+public class RepeatFn<C, S> extends AbstractFn<C, S, S> {
 
-    public BranchFn(final BranchFunction<C, S, E> branchFunction) {
-        super(branchFunction);
+    private final Compilation<C, S, ?> until;
+    private final TupleTag repeatDone;
+    private final TupleTag repeatLoop;
+
+
+    public RepeatFn(final RepeatBranch<C, S> repeatBranch, final TupleTag 
repeatDone, final TupleTag repeatLoop) {
+        super(repeatBranch);
+        this.until = repeatBranch.getUntil();
+        this.repeatDone = repeatDone;
+        this.repeatLoop = repeatLoop;
     }
 
     @ProcessElement
-    public void processElement(final @Element Traverser<C, S> traverser, final 
OutputReceiver<Traverser<C, E>> output) {
-        throw new IllegalStateException("Branching is implementing using 
split/merge streams in Beam");
+    public void processElement(final @DoFn.Element Traverser<C, S> traverser, 
final MultiOutputReceiver out) {
+        if (this.until.filterTraverser(traverser))
+            out.get(this.repeatDone).output(traverser.clone());
+        else
+            out.get(this.repeatLoop).output(traverser.clone());
     }
-}
\ No newline at end of file
+}
diff --git 
a/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java
 
b/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java
index 246b678..7323f74 100644
--- 
a/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java
+++ 
b/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java
@@ -23,10 +23,12 @@ import org.apache.tinkerpop.language.Traversal;
 import org.apache.tinkerpop.language.TraversalSource;
 import org.apache.tinkerpop.language.TraversalUtil;
 import org.apache.tinkerpop.language.__;
-import org.apache.tinkerpop.machine.coefficients.LongCoefficient;
 import org.apache.tinkerpop.machine.strategies.IdentityStrategy;
 import org.junit.jupiter.api.Test;
 
+import static org.apache.tinkerpop.language.__.incr;
+import static org.apache.tinkerpop.language.__.is;
+
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
@@ -38,18 +40,18 @@ public class BeamTest {
                 .withProcessor(BeamProcessor.class)
                 .withStrategy(IdentityStrategy.class);
 
-        Traversal<Long, ?,?> traversal = g.inject(7L, 7L, 10L, 
12L).identity().incr().groupCount().by(__.incr());
+        Traversal<Long, ?, ?> traversal = 
g.inject(5L).repeat(incr()).until(is(10L));
         
System.out.println(TraversalUtil.getBytecode(traversal).getSourceInstructions());
         System.out.println(TraversalUtil.getBytecode(traversal));
         System.out.println(traversal);
         System.out.println(traversal.toList());
         System.out.println("\n----------\n");
-        traversal = g.inject(7L, 10L, 
12L).as("a").c(3L).map(__.incr()).identity().incr().count();
+        traversal = g.inject(7L, 10L, 
12L).as("a").c(3L).map(incr()).identity().incr().count();
         System.out.println(TraversalUtil.getBytecode(traversal));
         System.out.println(traversal);
         System.out.println(traversal.toList());
         System.out.println("\n----------\n");
-        traversal = 
g.inject(7L).union(__.<Long>incr().incr().union(__.<Long>incr().as("b").identity().as("a"),__.<Long>incr().identity()),__.incr());
+        traversal = 
g.inject(7L).union(__.<Long>incr().incr().union(__.<Long>incr().as("b").identity().as("a"),
 __.<Long>incr().identity()), __.incr());
         System.out.println(TraversalUtil.getBytecode(traversal));
         System.out.println(traversal);
         System.out.println(traversal.toList());
diff --git 
a/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java
 
b/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java
index 9302e5d..e12c439 100644
--- 
a/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java
+++ 
b/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java
@@ -27,6 +27,8 @@ import 
org.apache.tinkerpop.machine.coefficients.LongCoefficient;
 import org.apache.tinkerpop.machine.strategies.IdentityStrategy;
 import org.junit.jupiter.api.Test;
 
+import java.util.Arrays;
+
 import static org.apache.tinkerpop.language.__.incr;
 import static org.apache.tinkerpop.language.__.is;
 
@@ -42,7 +44,7 @@ public class PipesTest {
                 .withProcessor(PipesProcessor.class)
                 .withStrategy(IdentityStrategy.class);
 
-        Traversal<Long, ?, ?> traversal = 
g.inject(1L,1L).repeat(incr()).until(is(10L));
+        Traversal<Long, ?, ?> traversal = g.inject(Arrays.asList(1L, 
1L)).<Long>unfold().repeat(incr()).until(is(10L));
         System.out.println(TraversalUtil.getBytecode(traversal));
         System.out.println(traversal);
         System.out.println(traversal.toList());

Reply via email to