This is an automated email from the ASF dual-hosted git repository. okram pushed a commit to branch tp4 in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit e21924fcd2cba64e1e8c88e70c092c72cfd053c4 Author: Marko A. Rodriguez <[email protected]> AuthorDate: Tue Mar 12 15:28:59 2019 -0600 repeat() implemented in Apache Beam. Its not pretty, but it works. I'm not that confident with the Beam API. Once I fully grock it, I will do a massive cleanup of the Beam compiler. --- .../org/apache/tinkerpop/language/Symbols.java | 3 + .../org/apache/tinkerpop/language/Traversal.java | 5 ++ .../tinkerpop/machine/bytecode/BytecodeUtil.java | 3 + .../machine/functions/branch/RepeatBranch.java | 18 ++++-- .../machine/functions/flatmap/UnfoldFlatMap.java | 68 ++++++++++++++++++++++ .../org/apache/tinkerpop/machine/beam/Beam.java | 27 +++++++-- .../apache/tinkerpop/machine/beam/BranchFn.java | 2 +- .../machine/beam/{BranchFn.java => RepeatFn.java} | 28 ++++++--- .../apache/tinkerpop/machine/beam/BeamTest.java | 10 ++-- .../apache/tinkerpop/machine/pipes/PipesTest.java | 4 +- 10 files changed, 147 insertions(+), 21 deletions(-) diff --git a/java/core/src/main/java/org/apache/tinkerpop/language/Symbols.java b/java/core/src/main/java/org/apache/tinkerpop/language/Symbols.java index 8b4ebef..a11321f 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/language/Symbols.java +++ b/java/core/src/main/java/org/apache/tinkerpop/language/Symbols.java @@ -47,6 +47,7 @@ public final class Symbols { public static final String PATH = "path"; public static final String REPEAT = "repeat"; public static final String SUM = "sum"; + public static final String UNFOLD = "unfold"; public static final String UNION = "union"; public Type getOpType(final String op) { @@ -77,6 +78,8 @@ public final class Symbols { return Type.BRANCH; case SUM: return Type.REDUCE; + case UNFOLD: + return Type.FLATMAP; case UNION: return Type.BRANCH; default: diff --git a/java/core/src/main/java/org/apache/tinkerpop/language/Traversal.java b/java/core/src/main/java/org/apache/tinkerpop/language/Traversal.java index cf90d24..fdcc8d4 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/language/Traversal.java +++ b/java/core/src/main/java/org/apache/tinkerpop/language/Traversal.java @@ -153,6 +153,11 @@ public class Traversal<C, S, E> implements Iterator<E> { return (Traversal) this; } + public <R> Traversal<C, S, R> unfold() { + this.bytecode.addInstruction(this.currentCoefficient, Symbols.UNFOLD); + return (Traversal) this; + } + public <R> Traversal<C, S, R> union(final Traversal<C, E, R> traversal, Traversal<C, E, R>... traversals) { this.bytecode.addInstruction(this.currentCoefficient, Symbols.UNION, traversal, traversals); return (Traversal) this; diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java b/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java index 42922b7..eea1df5 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java @@ -28,6 +28,7 @@ import org.apache.tinkerpop.machine.functions.filter.HasKeyFilter; import org.apache.tinkerpop.machine.functions.filter.HasKeyValueFilter; import org.apache.tinkerpop.machine.functions.filter.IdentityFilter; import org.apache.tinkerpop.machine.functions.filter.IsFilter; +import org.apache.tinkerpop.machine.functions.flatmap.UnfoldFlatMap; import org.apache.tinkerpop.machine.functions.initial.InjectInitial; import org.apache.tinkerpop.machine.functions.map.IncrMap; import org.apache.tinkerpop.machine.functions.map.MapMap; @@ -161,6 +162,8 @@ public final class BytecodeUtil { return new RepeatBranch(coefficient, labels, Compilation.compile(instruction.args())); case Symbols.SUM: return new SumReduce<>(coefficient, labels); + case Symbols.UNFOLD: + return new UnfoldFlatMap<>(coefficient, labels); case Symbols.UNION: return new UnionBranch<>(coefficient, labels, Compilation.compile(instruction.args())); default: diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/RepeatBranch.java b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/RepeatBranch.java index f8ec403..c31b7c1 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/RepeatBranch.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/RepeatBranch.java @@ -22,6 +22,7 @@ import org.apache.tinkerpop.machine.bytecode.Compilation; import org.apache.tinkerpop.machine.coefficients.Coefficient; import org.apache.tinkerpop.machine.functions.AbstractFunction; import org.apache.tinkerpop.machine.functions.BranchFunction; +import org.apache.tinkerpop.machine.processor.Processor; import org.apache.tinkerpop.machine.traversers.Traverser; import org.apache.tinkerpop.util.IteratorUtils; import org.apache.tinkerpop.util.StringFactory; @@ -47,14 +48,14 @@ public class RepeatBranch<C, S> extends AbstractFunction<C, S, Iterator<Traverse @Override public Iterator<Traverser<C, S>> apply(final Traverser<C, S> traverser) { - this.repeat.getProcessor().addStart(traverser); - return IteratorUtils.filter(this.repeat.getProcessor(), t -> { + final Processor<C, S, S> repeatProcessor = this.repeat.getProcessor(); + repeatProcessor.addStart(traverser); + return IteratorUtils.filter(repeatProcessor, t -> { if (!this.until.filterTraverser(t)) { - this.repeat.getProcessor().addStart(t); + repeatProcessor.addStart(t); return false; } else return true; - }); } @@ -67,4 +68,13 @@ public class RepeatBranch<C, S> extends AbstractFunction<C, S, Iterator<Traverse public List<Compilation<C, ?, ?>> getInternals() { return Arrays.asList(this.repeat, this.until); } + + public Compilation<C, S, S> getRepeat() { + return this.repeat; + } + + public Compilation<C, S, ?> getUntil() { + return this.until; + } + } diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/flatmap/UnfoldFlatMap.java b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/flatmap/UnfoldFlatMap.java new file mode 100644 index 0000000..807ae39 --- /dev/null +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/flatmap/UnfoldFlatMap.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.machine.functions.flatmap; + +import org.apache.tinkerpop.machine.coefficients.Coefficient; +import org.apache.tinkerpop.machine.functions.AbstractFunction; +import org.apache.tinkerpop.machine.functions.FlatMapFunction; +import org.apache.tinkerpop.machine.traversers.Traverser; +import org.apache.tinkerpop.util.ArrayIterator; +import org.apache.tinkerpop.util.IteratorUtils; + +import java.lang.reflect.Array; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public class UnfoldFlatMap<C, S, E> extends AbstractFunction<C, S, E> implements FlatMapFunction<C, S, E> { + + public UnfoldFlatMap(final Coefficient<C> coefficient, final Set<String> labels) { + super(coefficient, labels); + } + + @Override + public Iterator<E> apply(final Traverser<C, S> traverser) { + final S object = traverser.object(); + if (object instanceof Iterator) + return (Iterator<E>) object; + else if (object instanceof Iterable) + return ((Iterable<E>) object).iterator(); + else if (object instanceof Map) + return ((Map) object).entrySet().iterator(); + else if (object.getClass().isArray()) + return handleArrays(object); + else + return IteratorUtils.of((E) object); + } + + private final Iterator<E> handleArrays(final Object array) { + if (array instanceof Object[]) { + return new ArrayIterator<>((E[]) array); + } else { + int len = Array.getLength(array); + final Object[] objectArray = new Object[len]; + for (int i = 0; i < len; i++) + objectArray[i] = Array.get(array, i); + return new ArrayIterator<>((E[]) objectArray); + } + } +} \ No newline at end of file diff --git a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java index 166f44b..cf517e9 100644 --- a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java +++ b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java @@ -27,9 +27,9 @@ import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; -import org.apache.beam.sdk.values.TypeDescriptor; -import org.apache.tinkerpop.machine.beam.serialization.CoefficientCoder; -import org.apache.tinkerpop.machine.beam.serialization.ObjectCoder; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; import org.apache.tinkerpop.machine.beam.serialization.TraverserCoder; import org.apache.tinkerpop.machine.bytecode.Compilation; import org.apache.tinkerpop.machine.coefficients.Coefficient; @@ -41,6 +41,7 @@ import org.apache.tinkerpop.machine.functions.FlatMapFunction; import org.apache.tinkerpop.machine.functions.InitialFunction; import org.apache.tinkerpop.machine.functions.MapFunction; import org.apache.tinkerpop.machine.functions.ReduceFunction; +import org.apache.tinkerpop.machine.functions.branch.RepeatBranch; import org.apache.tinkerpop.machine.processor.Processor; import org.apache.tinkerpop.machine.traversers.Traverser; import org.apache.tinkerpop.machine.traversers.TraverserFactory; @@ -77,7 +78,25 @@ public class Beam<C, S, E> implements Processor<C, S, E> { final CFunction<?> function, final boolean branching) { DoFn<Traverser<C, S>, Traverser<C, E>> fn = null; - if (function instanceof BranchFunction) { + if (function instanceof RepeatBranch) { + final Compilation<C, S, S> repeat = ((RepeatBranch) function).getRepeat(); + final List<PCollection> outputs = new ArrayList<>(); + final TupleTag repeatDone = new TupleTag<>(); + final TupleTag repeatLoop = new TupleTag<>(); + for (int i = 0; i < 10; i++) { + fn = new RepeatFn((RepeatBranch) function, repeatDone, repeatLoop); + PCollectionTuple branches = (PCollectionTuple) collection.apply(ParDo.of(fn).withOutputTags(repeatLoop, TupleTagList.of(repeatDone))); + branches.get(repeatLoop).setCoder(new TraverserCoder()); + branches.get(repeatDone).setCoder(new TraverserCoder()); + outputs.add(branches.get(repeatDone)); + for (final CFunction<C> repeatFunction : repeat.getFunctions()) { + collection = this.processFunction(branches.get(repeatLoop), traverserFactory, repeatFunction, true); + } + } + this.functions.add((Fn) fn); + collection = (PCollection) PCollectionList.of((Iterable) outputs).apply(Flatten.pCollections()); + collection.setCoder(new TraverserCoder()); + } else if (function instanceof BranchFunction) { final List<Compilation<C, ?, ?>> branches = ((BranchFunction<C, ?, ?>) function).getInternals(); final List<PCollection<Traverser<C, ?>>> collections = new ArrayList<>(branches.size()); for (final Compilation<C, ?, ?> branch : branches) { diff --git a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/BranchFn.java b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/BranchFn.java index e1f9442..65e8cae 100644 --- a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/BranchFn.java +++ b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/BranchFn.java @@ -32,6 +32,6 @@ public class BranchFn<C, S, E> extends AbstractFn<C, S, E> { @ProcessElement public void processElement(final @Element Traverser<C, S> traverser, final OutputReceiver<Traverser<C, E>> output) { - throw new IllegalStateException("Branching is implementing using split/merge streams in Beam"); + throw new IllegalStateException("Branching is implemented using split/merge streams in Beam"); } } \ No newline at end of file diff --git a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/BranchFn.java b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/RepeatFn.java similarity index 51% copy from java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/BranchFn.java copy to java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/RepeatFn.java index e1f9442..8edb581 100644 --- a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/BranchFn.java +++ b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/RepeatFn.java @@ -18,20 +18,34 @@ */ package org.apache.tinkerpop.machine.beam; -import org.apache.tinkerpop.machine.functions.BranchFunction; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.tinkerpop.machine.bytecode.Compilation; +import org.apache.tinkerpop.machine.functions.branch.RepeatBranch; import org.apache.tinkerpop.machine.traversers.Traverser; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public class BranchFn<C, S, E> extends AbstractFn<C, S, E> { +public class RepeatFn<C, S> extends AbstractFn<C, S, S> { - public BranchFn(final BranchFunction<C, S, E> branchFunction) { - super(branchFunction); + private final Compilation<C, S, ?> until; + private final TupleTag repeatDone; + private final TupleTag repeatLoop; + + + public RepeatFn(final RepeatBranch<C, S> repeatBranch, final TupleTag repeatDone, final TupleTag repeatLoop) { + super(repeatBranch); + this.until = repeatBranch.getUntil(); + this.repeatDone = repeatDone; + this.repeatLoop = repeatLoop; } @ProcessElement - public void processElement(final @Element Traverser<C, S> traverser, final OutputReceiver<Traverser<C, E>> output) { - throw new IllegalStateException("Branching is implementing using split/merge streams in Beam"); + public void processElement(final @DoFn.Element Traverser<C, S> traverser, final MultiOutputReceiver out) { + if (this.until.filterTraverser(traverser)) + out.get(this.repeatDone).output(traverser.clone()); + else + out.get(this.repeatLoop).output(traverser.clone()); } -} \ No newline at end of file +} diff --git a/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java b/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java index 246b678..7323f74 100644 --- a/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java +++ b/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java @@ -23,10 +23,12 @@ import org.apache.tinkerpop.language.Traversal; import org.apache.tinkerpop.language.TraversalSource; import org.apache.tinkerpop.language.TraversalUtil; import org.apache.tinkerpop.language.__; -import org.apache.tinkerpop.machine.coefficients.LongCoefficient; import org.apache.tinkerpop.machine.strategies.IdentityStrategy; import org.junit.jupiter.api.Test; +import static org.apache.tinkerpop.language.__.incr; +import static org.apache.tinkerpop.language.__.is; + /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ @@ -38,18 +40,18 @@ public class BeamTest { .withProcessor(BeamProcessor.class) .withStrategy(IdentityStrategy.class); - Traversal<Long, ?,?> traversal = g.inject(7L, 7L, 10L, 12L).identity().incr().groupCount().by(__.incr()); + Traversal<Long, ?, ?> traversal = g.inject(5L).repeat(incr()).until(is(10L)); System.out.println(TraversalUtil.getBytecode(traversal).getSourceInstructions()); System.out.println(TraversalUtil.getBytecode(traversal)); System.out.println(traversal); System.out.println(traversal.toList()); System.out.println("\n----------\n"); - traversal = g.inject(7L, 10L, 12L).as("a").c(3L).map(__.incr()).identity().incr().count(); + traversal = g.inject(7L, 10L, 12L).as("a").c(3L).map(incr()).identity().incr().count(); System.out.println(TraversalUtil.getBytecode(traversal)); System.out.println(traversal); System.out.println(traversal.toList()); System.out.println("\n----------\n"); - traversal = g.inject(7L).union(__.<Long>incr().incr().union(__.<Long>incr().as("b").identity().as("a"),__.<Long>incr().identity()),__.incr()); + traversal = g.inject(7L).union(__.<Long>incr().incr().union(__.<Long>incr().as("b").identity().as("a"), __.<Long>incr().identity()), __.incr()); System.out.println(TraversalUtil.getBytecode(traversal)); System.out.println(traversal); System.out.println(traversal.toList()); diff --git a/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java b/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java index 9302e5d..e12c439 100644 --- a/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java +++ b/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java @@ -27,6 +27,8 @@ import org.apache.tinkerpop.machine.coefficients.LongCoefficient; import org.apache.tinkerpop.machine.strategies.IdentityStrategy; import org.junit.jupiter.api.Test; +import java.util.Arrays; + import static org.apache.tinkerpop.language.__.incr; import static org.apache.tinkerpop.language.__.is; @@ -42,7 +44,7 @@ public class PipesTest { .withProcessor(PipesProcessor.class) .withStrategy(IdentityStrategy.class); - Traversal<Long, ?, ?> traversal = g.inject(1L,1L).repeat(incr()).until(is(10L)); + Traversal<Long, ?, ?> traversal = g.inject(Arrays.asList(1L, 1L)).<Long>unfold().repeat(incr()).until(is(10L)); System.out.println(TraversalUtil.getBytecode(traversal)); System.out.println(traversal); System.out.println(traversal.toList());
