This is an automated email from the ASF dual-hosted git repository. okram pushed a commit to branch tp4 in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit bfa48cbe9cdcaa53faa2925128039e0917fca53d Author: Marko A. Rodriguez <okramma...@gmail.com> AuthorDate: Wed Mar 13 11:35:06 2019 -0600 BranchFuncions are no longer Java functions. Given that the implementation of branching is heavily dependent on processing engine semantics, BranchFunctions simply expose a branch selector and a Map of selector/traversal pairs. It is up to the processor provider to use that information to construct a branching topology. For Pipes, its pull-based. For Beam, its push-based. --- .../org/apache/tinkerpop/language/Symbols.java | 5 ++ .../org/apache/tinkerpop/language/Traversal.java | 5 ++ .../tinkerpop/machine/bytecode/BytecodeUtil.java | 5 ++ .../machine/functions/AbstractFunction.java | 2 +- .../machine/functions/BranchFunction.java | 5 +- .../machine/functions/branch/ChooseBranch.java | 14 +---- .../machine/functions/branch/RepeatBranch.java | 19 +----- .../machine/functions/branch/UnionBranch.java | 15 +---- .../machine/functions/filter/FilterFilter.java | 4 +- .../machine/functions/filter/HasKeyFilter.java | 2 +- .../functions/filter/HasKeyValueFilter.java | 2 +- .../machine/functions/filter/IdentityFilter.java | 2 +- .../machine/functions/filter/IsFilter.java | 2 +- .../machine/functions/flatmap/UnfoldFlatMap.java | 2 +- .../machine/functions/initial/InjectInitial.java | 2 +- .../tinkerpop/machine/functions/map/IncrMap.java | 2 +- .../tinkerpop/machine/functions/map/MapMap.java | 2 +- .../tinkerpop/machine/functions/map/PathMap.java | 2 +- .../machine/functions/reduce/CountReduce.java | 2 +- .../machine/functions/reduce/GroupCountReduce.java | 2 +- .../machine/functions/reduce/SumReduce.java | 2 +- .../tinkerpop/machine/traversers/Traverser.java | 4 -- .../apache/tinkerpop/machine/beam/BranchFn.java | 11 ++-- .../apache/tinkerpop/machine/beam/BeamTest.java | 5 ++ .../apache/tinkerpop/machine/pipes/BranchStep.java | 43 ++++++++----- .../org/apache/tinkerpop/machine/pipes/Pipes.java | 7 ++- .../apache/tinkerpop/machine/pipes/RepeatStep.java | 70 ++++++++++++++++++++++ .../apache/tinkerpop/machine/pipes/PipesTest.java | 2 +- 28 files changed, 152 insertions(+), 88 deletions(-) diff --git a/java/core/src/main/java/org/apache/tinkerpop/language/Symbols.java b/java/core/src/main/java/org/apache/tinkerpop/language/Symbols.java index efeeaad..5ec8eea 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/language/Symbols.java +++ b/java/core/src/main/java/org/apache/tinkerpop/language/Symbols.java @@ -34,6 +34,7 @@ public final class Symbols { // INSTRUCTION OPS + public static final String CHOOSE_IF_THEN = "chooseIfThen"; public static final String CHOOSE_IF_THEN_ELSE = "chooseIfThenElse"; public static final String COUNT = "count"; public static final String FILTER = "filter"; @@ -53,6 +54,10 @@ public final class Symbols { public Type getOpType(final String op) { switch (op) { + case CHOOSE_IF_THEN: + return Type.BRANCH; + case CHOOSE_IF_THEN_ELSE: + return Type.BRANCH; case COUNT: return Type.REDUCE; case FILTER: diff --git a/java/core/src/main/java/org/apache/tinkerpop/language/Traversal.java b/java/core/src/main/java/org/apache/tinkerpop/language/Traversal.java index 9a17bc5..5ba6f68 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/language/Traversal.java +++ b/java/core/src/main/java/org/apache/tinkerpop/language/Traversal.java @@ -68,6 +68,11 @@ public class Traversal<C, S, E> implements Iterator<E> { return (Traversal) this; } + public <R> Traversal<C, S, R> choose(final Traversal<C, E, ?> predicate, final Traversal<C, S, R> trueTraversal) { + this.bytecode.addInstruction(this.currentCoefficient, Symbols.CHOOSE_IF_THEN, predicate, trueTraversal); + return (Traversal) this; + } + public Traversal<C, S, Long> count() { this.bytecode.addInstruction(this.currentCoefficient, Symbols.COUNT); return (Traversal) this; diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java b/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java index b7110a3..6959063 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java @@ -137,6 +137,11 @@ public final class BytecodeUtil { final Coefficient<C> coefficient = instruction.coefficient(); final Set<String> labels = instruction.labels(); switch (op) { + case Symbols.CHOOSE_IF_THEN: + return new ChooseBranch<>(coefficient, labels, + Compilation.compileOne(instruction.args()[0]), + Compilation.compileOne(instruction.args()[1]), + null); case Symbols.CHOOSE_IF_THEN_ELSE: return new ChooseBranch<>(coefficient, labels, Compilation.compileOne(instruction.args()[0]), diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/AbstractFunction.java b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/AbstractFunction.java index d9d149b..402389b 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/AbstractFunction.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/AbstractFunction.java @@ -26,7 +26,7 @@ import java.util.Set; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public abstract class AbstractFunction<C, S, E> implements CFunction<C> { +public abstract class AbstractFunction<C> implements CFunction<C> { private Coefficient<C> coefficient; private Set<String> labels; diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/BranchFunction.java b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/BranchFunction.java index 159b2c8..e2ecfe0 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/BranchFunction.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/BranchFunction.java @@ -20,17 +20,14 @@ package org.apache.tinkerpop.machine.functions; import org.apache.tinkerpop.machine.bytecode.Compilation; import org.apache.tinkerpop.machine.functions.branch.selector.Selector; -import org.apache.tinkerpop.machine.traversers.Traverser; -import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.function.Function; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public interface BranchFunction<C, S, E, M> extends Function<Traverser<C, S>, Iterator<Traverser<C, E>>>, InternalFunction<C> { +public interface BranchFunction<C, S, E, M> extends InternalFunction<C> { public Selector<C, S, M> getBranchSelector(); diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/ChooseBranch.java b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/ChooseBranch.java index c34763f..68d13f1 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/ChooseBranch.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/ChooseBranch.java @@ -24,12 +24,10 @@ import org.apache.tinkerpop.machine.functions.AbstractFunction; import org.apache.tinkerpop.machine.functions.BranchFunction; import org.apache.tinkerpop.machine.functions.branch.selector.HasNextSelector; import org.apache.tinkerpop.machine.functions.branch.selector.Selector; -import org.apache.tinkerpop.machine.traversers.Traverser; import org.apache.tinkerpop.util.StringFactory; import java.util.Collections; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -37,7 +35,7 @@ import java.util.Set; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public class ChooseBranch<C, S, E> extends AbstractFunction<C, S, Iterator<Traverser<C, E>>> implements BranchFunction<C, S, E, Boolean> { +public class ChooseBranch<C, S, E> extends AbstractFunction<C> implements BranchFunction<C, S, E, Boolean> { private final HasNextSelector<C, S> branchSelector; private final Map<Boolean, List<Compilation<C, S, E>>> branches; @@ -59,14 +57,8 @@ public class ChooseBranch<C, S, E> extends AbstractFunction<C, S, Iterator<Trave this.branchSelector = new HasNextSelector<>(predicate); this.branches = new HashMap<>(); this.branches.put(Boolean.TRUE, Collections.singletonList(trueBranch)); - this.branches.put(Boolean.FALSE, Collections.singletonList(falseBranch)); - } - - @Override - public Iterator<Traverser<C, E>> apply(final Traverser<C, S> traverser) { - return this.predicate.filterTraverser(traverser) ? - this.trueBranch.addTraverser(traverser) : - this.falseBranch.addTraverser(traverser); + if (null != this.falseBranch) + this.branches.put(Boolean.FALSE, Collections.singletonList(falseBranch)); } @Override diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/RepeatBranch.java b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/RepeatBranch.java index cdbbad1..500b592 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/RepeatBranch.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/RepeatBranch.java @@ -23,12 +23,8 @@ import org.apache.tinkerpop.machine.coefficients.Coefficient; import org.apache.tinkerpop.machine.functions.AbstractFunction; import org.apache.tinkerpop.machine.functions.BranchFunction; import org.apache.tinkerpop.machine.functions.branch.selector.Selector; -import org.apache.tinkerpop.machine.processor.Processor; -import org.apache.tinkerpop.machine.traversers.Traverser; -import org.apache.tinkerpop.util.IteratorUtils; import org.apache.tinkerpop.util.StringFactory; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -36,7 +32,7 @@ import java.util.Set; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public class RepeatBranch<C, S> extends AbstractFunction<C, S, Iterator<Traverser<C, S>>> implements BranchFunction<C, S, S, Boolean> { +public class RepeatBranch<C, S> extends AbstractFunction<C> implements BranchFunction<C, S, S, Boolean> { private final Compilation<C, S, S> repeat; private final Compilation<C, S, ?> until; @@ -48,19 +44,6 @@ public class RepeatBranch<C, S> extends AbstractFunction<C, S, Iterator<Traverse } @Override - public Iterator<Traverser<C, S>> apply(final Traverser<C, S> traverser) { - final Processor<C, S, S> repeatProcessor = this.repeat.getProcessor(); - repeatProcessor.addStart(traverser); - return IteratorUtils.filter(repeatProcessor, t -> { - if (!this.until.filterTraverser(t)) { - repeatProcessor.addStart(t); - return false; - } else - return true; - }); - } - - @Override public String toString() { return StringFactory.makeFunctionString(this, this.repeat, this.until); } diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/UnionBranch.java b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/UnionBranch.java index 3195a40..3381f73 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/UnionBranch.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/UnionBranch.java @@ -24,12 +24,9 @@ import org.apache.tinkerpop.machine.functions.AbstractFunction; import org.apache.tinkerpop.machine.functions.BranchFunction; import org.apache.tinkerpop.machine.functions.branch.selector.Selector; import org.apache.tinkerpop.machine.functions.branch.selector.TrueSelector; -import org.apache.tinkerpop.machine.traversers.Traverser; -import org.apache.tinkerpop.util.MultiIterator; import org.apache.tinkerpop.util.StringFactory; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -37,11 +34,10 @@ import java.util.Set; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public final class UnionBranch<C, S, E> extends AbstractFunction<C, S, Iterator<Traverser<C, E>>> implements BranchFunction<C, S, E, Boolean> { +public final class UnionBranch<C, S, E> extends AbstractFunction<C> implements BranchFunction<C, S, E, Boolean> { private final Map<Boolean, List<Compilation<C, S, E>>> branches; - public UnionBranch(final Coefficient<C> coefficient, final Set<String> labels, final List<Compilation<C, S, E>> branches) { super(coefficient, labels); this.branches = new HashMap<>(); @@ -49,15 +45,6 @@ public final class UnionBranch<C, S, E> extends AbstractFunction<C, S, Iterator< } @Override - public Iterator<Traverser<C, E>> apply(final Traverser<C, S> traverser) { - final MultiIterator<Traverser<C, E>> iterator = new MultiIterator<>(); - for (final Compilation<C, S, E> branch : this.branches.get(Boolean.TRUE)) { - iterator.addIterator(branch.addTraverser(traverser)); - } - return iterator; - } - - @Override public String toString() { return StringFactory.makeFunctionString(this, this.branches.values().iterator().next()); // make a flat array } diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/FilterFilter.java b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/FilterFilter.java index 5f551c6..3c623eb 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/FilterFilter.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/FilterFilter.java @@ -26,14 +26,12 @@ import org.apache.tinkerpop.machine.functions.NestedFunction; import org.apache.tinkerpop.machine.traversers.Traverser; import org.apache.tinkerpop.util.StringFactory; -import java.util.Collections; -import java.util.List; import java.util.Set; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public final class FilterFilter<C, S> extends AbstractFunction<C, S, S> implements FilterFunction<C, S>, NestedFunction<C> { +public final class FilterFilter<C, S> extends AbstractFunction<C> implements FilterFunction<C, S>, NestedFunction<C> { private final Compilation<C, S, ?> internalFilter; diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/HasKeyFilter.java b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/HasKeyFilter.java index 09320a7..63e2b71 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/HasKeyFilter.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/HasKeyFilter.java @@ -31,7 +31,7 @@ import java.util.Set; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public final class HasKeyFilter<C, K, V> extends AbstractFunction<C, Map<K, V>, Map<K, V>> implements FilterFunction<C, Map<K, V>> { +public final class HasKeyFilter<C, K, V> extends AbstractFunction<C> implements FilterFunction<C, Map<K, V>> { private final Argument<K> key; diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/HasKeyValueFilter.java b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/HasKeyValueFilter.java index 70aecc3..c713c55 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/HasKeyValueFilter.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/HasKeyValueFilter.java @@ -31,7 +31,7 @@ import java.util.Set; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public final class HasKeyValueFilter<C, K, V> extends AbstractFunction<C, Map<K, V>, Map<K, V>> implements FilterFunction<C, Map<K, V>> { +public final class HasKeyValueFilter<C, K, V> extends AbstractFunction<C> implements FilterFunction<C, Map<K, V>> { private final Argument<K> key; private final Argument<V> value; diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/IdentityFilter.java b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/IdentityFilter.java index 4d479ca..418fd6e 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/IdentityFilter.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/IdentityFilter.java @@ -28,7 +28,7 @@ import java.util.Set; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public final class IdentityFilter<C, S> extends AbstractFunction<C, S, S> implements FilterFunction<C, S> { +public final class IdentityFilter<C, S> extends AbstractFunction<C> implements FilterFunction<C, S> { public IdentityFilter(final Coefficient<C> coefficient, final Set<String> labels) { super(coefficient, labels); diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/IsFilter.java b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/IsFilter.java index 129f40f..b556bc7 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/IsFilter.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/IsFilter.java @@ -29,7 +29,7 @@ import java.util.Set; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public final class IsFilter<C, S> extends AbstractFunction<C, S, S> implements FilterFunction<C, S> { +public final class IsFilter<C, S> extends AbstractFunction<C> implements FilterFunction<C, S> { private S object; diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/flatmap/UnfoldFlatMap.java b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/flatmap/UnfoldFlatMap.java index 807ae39..ff05c8a 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/flatmap/UnfoldFlatMap.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/flatmap/UnfoldFlatMap.java @@ -33,7 +33,7 @@ import java.util.Set; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public class UnfoldFlatMap<C, S, E> extends AbstractFunction<C, S, E> implements FlatMapFunction<C, S, E> { +public class UnfoldFlatMap<C, S, E> extends AbstractFunction<C> implements FlatMapFunction<C, S, E> { public UnfoldFlatMap(final Coefficient<C> coefficient, final Set<String> labels) { super(coefficient, labels); diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/initial/InjectInitial.java b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/initial/InjectInitial.java index 4c15206..1f39997 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/initial/InjectInitial.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/initial/InjectInitial.java @@ -30,7 +30,7 @@ import java.util.stream.Stream; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public class InjectInitial<C, S> extends AbstractFunction<C, S, S> implements InitialFunction<C, S> { +public class InjectInitial<C, S> extends AbstractFunction<C> implements InitialFunction<C, S> { private final S[] objects; diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/IncrMap.java b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/IncrMap.java index e1eff44..15a193e 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/IncrMap.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/IncrMap.java @@ -28,7 +28,7 @@ import java.util.Set; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public class IncrMap<C> extends AbstractFunction<C, Long, Long> implements MapFunction<C, Long, Long> { +public class IncrMap<C> extends AbstractFunction<C> implements MapFunction<C, Long, Long> { public IncrMap(final Coefficient<C> coefficient, final Set<String> labels) { super(coefficient, labels); diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/MapMap.java b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/MapMap.java index b0d0fc5..4e2185e 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/MapMap.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/MapMap.java @@ -33,7 +33,7 @@ import java.util.Set; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public class MapMap<C, S, E> extends AbstractFunction<C, S, E> implements MapFunction<C, S, E>, NestedFunction<C> { +public class MapMap<C, S, E> extends AbstractFunction<C> implements MapFunction<C, S, E>, NestedFunction<C> { private final Compilation<C, S, E> internalMap; diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/PathMap.java b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/PathMap.java index 8e66ccd..dbf2a51 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/PathMap.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/PathMap.java @@ -32,7 +32,7 @@ import java.util.Set; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public class PathMap<C, S> extends AbstractFunction<C, S, Path> implements MapFunction<C, S, Path> { +public class PathMap<C, S> extends AbstractFunction<C> implements MapFunction<C, S, Path> { private final CompilationRing<C, Object, Object> compilationRing; diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/CountReduce.java b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/CountReduce.java index 65dde8e..f96e4e4 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/CountReduce.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/CountReduce.java @@ -28,7 +28,7 @@ import java.util.Set; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public class CountReduce<C, S> extends AbstractFunction<C, S, Long> implements ReduceFunction<C, S, Long> { +public class CountReduce<C, S> extends AbstractFunction<C> implements ReduceFunction<C, S, Long> { public CountReduce(final Coefficient<C> coefficient, final Set<String> labels) { super(coefficient, labels); diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/GroupCountReduce.java b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/GroupCountReduce.java index d9f713f..1bed8d9 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/GroupCountReduce.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/GroupCountReduce.java @@ -34,7 +34,7 @@ import java.util.Set; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public class GroupCountReduce<C, S, E> extends AbstractFunction<C, S, Map<E, Long>> implements ReduceFunction<C, S, Map<E, Long>>, NestedFunction<C> { +public class GroupCountReduce<C, S, E> extends AbstractFunction<C> implements ReduceFunction<C, S, Map<E, Long>>, NestedFunction<C> { private final Compilation<C, S, E> byCompilation; diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/SumReduce.java b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/SumReduce.java index 8963c2f..bd08c0e 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/SumReduce.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/SumReduce.java @@ -29,7 +29,7 @@ import java.util.Set; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public class SumReduce<C, S extends Number> extends AbstractFunction<C, S, S> implements ReduceFunction<C, S, S> { +public class SumReduce<C, S extends Number> extends AbstractFunction<C> implements ReduceFunction<C, S, S> { public SumReduce(final Coefficient<C> coefficient, final Set<String> labels) { super(coefficient, labels); diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/Traverser.java b/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/Traverser.java index 1a4ed56..1174206 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/Traverser.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/Traverser.java @@ -59,10 +59,6 @@ public interface Traverser<C, S> extends Serializable, Cloneable { return IteratorUtils.map(function.apply(this), e -> this.split(function, e)); } - public default <E,M> Iterator<Traverser<C, E>> branch(final BranchFunction<C, S, E,M> function) { - return function.apply(this); - } - //public default void sideEffect(final SideEffectFunction<C,S> function); public default <E> Traverser<C, E> reduce(final ReduceFunction<C, S, E> function, final E reducedValue) { diff --git a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/BranchFn.java b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/BranchFn.java index f5560d3..1159553 100644 --- a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/BranchFn.java +++ b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/BranchFn.java @@ -41,10 +41,13 @@ public class BranchFn<C, S, E, M> extends AbstractFn<C, S, S> { } @ProcessElement - public void processElement(final @Element Traverser<C, S> traverser, final MultiOutputReceiver out) { - final Optional<M> selector = this.branchSelector.from(traverser); - if (selector.isPresent()) - out.get(this.branches.get(selector.get())).output(traverser.clone()); + public void processElement(final @Element Traverser<C, S> traverser, final MultiOutputReceiver outputs) { + final Optional<M> selector = this.branchSelector.from(traverser.clone()); + if (selector.isPresent()) { + final TupleTag<Traverser<C, S>> outputTag = this.branches.get(selector.get()); + if (null != outputTag) + outputs.get(outputTag).output(traverser.clone()); + } } } \ No newline at end of file diff --git a/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java b/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java index b161995..d04bac1 100644 --- a/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java +++ b/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java @@ -62,5 +62,10 @@ public class BeamTest { System.out.println(TraversalUtil.getBytecode(traversal)); System.out.println(traversal); System.out.println(traversal.toList()); + + traversal = g.inject(8L).choose(__.is(7L),__.incr()); + System.out.println(TraversalUtil.getBytecode(traversal)); + System.out.println(traversal); + System.out.println(traversal.toList()); } } diff --git a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/BranchStep.java b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/BranchStep.java index fec154a..987be52 100644 --- a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/BranchStep.java +++ b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/BranchStep.java @@ -18,41 +18,56 @@ */ package org.apache.tinkerpop.machine.pipes; +import org.apache.tinkerpop.machine.bytecode.Compilation; import org.apache.tinkerpop.machine.functions.BranchFunction; +import org.apache.tinkerpop.machine.functions.branch.selector.Selector; import org.apache.tinkerpop.machine.traversers.Traverser; +import org.apache.tinkerpop.util.MultiIterator; import java.util.Collections; import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ public final class BranchStep<C, S, E, M> extends AbstractStep<C, S, E> { - private final BranchFunction<C, S, E, M> branchFunction; - private Iterator<Traverser<C, E>> iterator = Collections.emptyIterator(); + private final Selector<C, S, M> branchSelector; + private final Map<M, List<Compilation<C, S, E>>> branches; + private Iterator<Traverser<C, E>> output = Collections.emptyIterator(); public BranchStep(final AbstractStep<C, ?, S> previousStep, final BranchFunction<C, S, E, M> branchFunction) { super(previousStep, branchFunction); - this.branchFunction = branchFunction; + this.branchSelector = branchFunction.getBranchSelector(); + this.branches = branchFunction.getBranches(); } @Override public boolean hasNext() { - while (true) { - if (this.iterator.hasNext()) - return true; - else if (super.hasNext()) - this.iterator = super.getPreviousTraverser().branch(this.branchFunction); - else - return false; - } + this.stageOutput(); + return this.output.hasNext(); } @Override public Traverser<C, E> next() { - if (!this.iterator.hasNext()) - this.iterator = super.getPreviousTraverser().branch(this.branchFunction); - return this.iterator.next(); + this.stageOutput(); + return this.output.next(); } + + private final void stageOutput() { + while (!this.output.hasNext() && super.hasNext()) { + final Traverser<C, S> traverser = super.getPreviousTraverser(); + final Optional<M> token = this.branchSelector.from(traverser); + if (token.isPresent()) { + this.output = new MultiIterator<>(); + for (final Compilation<C, S, E> branch : this.branches.get(token.get())) { + ((MultiIterator<Traverser<C, E>>) this.output).addIterator(branch.addTraverser(traverser.clone())); + } + } + } + } + } \ No newline at end of file diff --git a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java index 24e0cd4..f04be72 100644 --- a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java +++ b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java @@ -26,6 +26,7 @@ import org.apache.tinkerpop.machine.functions.FlatMapFunction; import org.apache.tinkerpop.machine.functions.InitialFunction; import org.apache.tinkerpop.machine.functions.MapFunction; import org.apache.tinkerpop.machine.functions.ReduceFunction; +import org.apache.tinkerpop.machine.functions.branch.RepeatBranch; import org.apache.tinkerpop.machine.pipes.util.BasicReducer; import org.apache.tinkerpop.machine.processor.Processor; import org.apache.tinkerpop.machine.traversers.Traverser; @@ -46,8 +47,10 @@ public final class Pipes<C, S, E> implements Processor<C, S, E> { AbstractStep<C, ?, ?> previousStep = EmptyStep.instance(); for (final CFunction<?> function : compilation.getFunctions()) { final AbstractStep nextStep; - if (function instanceof BranchFunction) - nextStep = new BranchStep(previousStep, (BranchFunction<C, ?, ?,?>) function); + if (function instanceof RepeatBranch) + nextStep = new RepeatStep(previousStep, (RepeatBranch<C, ?>) function); + else if (function instanceof BranchFunction) + nextStep = new BranchStep(previousStep, (BranchFunction<C, ?, ?, ?>) function); else if (function instanceof FilterFunction) nextStep = new FilterStep(previousStep, (FilterFunction<C, ?>) function); else if (function instanceof FlatMapFunction) diff --git a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/RepeatStep.java b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/RepeatStep.java new file mode 100644 index 0000000..a303208 --- /dev/null +++ b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/RepeatStep.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.machine.pipes; + +import org.apache.tinkerpop.machine.bytecode.Compilation; +import org.apache.tinkerpop.machine.functions.branch.RepeatBranch; +import org.apache.tinkerpop.machine.traversers.Traverser; +import org.apache.tinkerpop.util.IteratorUtils; + +import java.util.Iterator; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class RepeatStep<C, S> extends AbstractStep<C, S, S> { + + private final Compilation<C, S, ?> until; + private final Compilation<C, S, S> repeat; + private final Iterator<Traverser<C, S>> output; + + public RepeatStep(final AbstractStep<C, ?, S> previousStep, final RepeatBranch<C, S> repeatFunction) { + super(previousStep, repeatFunction); + this.until = repeatFunction.getUntil(); + this.repeat = repeatFunction.getRepeat(); + this.output = IteratorUtils.filter(this.repeat.getProcessor(), t -> { + if (!this.until.filterTraverser(t)) { + this.repeat.getProcessor().addStart(t); + return false; + } else + return true; + }); + } + + @Override + public boolean hasNext() { + this.stageOutput(); + return this.output.hasNext(); + } + + @Override + public Traverser<C, S> next() { + this.stageOutput(); + return this.output.next(); + } + + private final void stageOutput() { + while (!this.output.hasNext() && super.hasNext()) { + this.repeat.addTraverser(super.getPreviousTraverser()); + } + + } + +} + diff --git a/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java b/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java index d51b966..81374a4 100644 --- a/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java +++ b/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java @@ -54,7 +54,7 @@ public class PipesTest { System.out.println(traversal); System.out.println(traversal.toList()); System.out.println("\n----------\n"); - traversal = g.inject(8L).choose(__.is(7L),__.incr(),__.<Long>incr().incr()); + traversal = g.inject(7L).choose(__.is(7L),__.incr()); System.out.println(TraversalUtil.getBytecode(traversal)); System.out.println(traversal); System.out.println(traversal.toList());