This is an automated email from the ASF dual-hosted git repository. okram pushed a commit to branch tp4 in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit 49c7d208b85955b2f4906b21007da25b338f0b6c Author: Marko A. Rodriguez <okramma...@gmail.com> AuthorDate: Mon Mar 11 05:42:27 2019 -0600 have the first stub of Apache Beam working in TP4. I was in the most nightmarish hell for the longest time --- I didn't have the machine pom.xml have beam as a submodule so changes I did to core did nothing. Had this serialization error and I spent a good 2 hours to ultimately realize that my code changes werent taking effect. So bad. --- .../org/apache/tinkerpop/language/Symbols.java | 1 + .../org/apache/tinkerpop/language/Traversal.java | 5 + .../apache/tinkerpop/language/TraversalSource.java | 2 + .../tinkerpop/machine/bytecode/Bytecode.java | 36 ++++++- .../tinkerpop/machine/bytecode/BytecodeUtil.java | 3 + .../SourceInstruction.java} | 23 +++-- .../machine/coefficients/Coefficient.java | 4 +- .../machine/coefficients/LongCoefficient.java | 5 +- .../machine/functions/AbstractFunction.java | 12 +-- .../tinkerpop/machine/functions/CFunction.java | 3 +- .../machine/functions/InitialFunction.java | 4 +- .../tinkerpop/machine/functions/MapFunction.java | 3 +- .../{MapFunction.java => ReduceFunction.java} | 6 +- .../machine/functions/filter/IdentityFilter.java | 3 +- .../machine/functions/filter/IsFilter.java | 4 +- .../machine/functions/initial/InjectInitial.java | 5 +- .../tinkerpop/machine/functions/map/IncrMap.java | 5 +- .../tinkerpop/machine/functions/map/MapMap.java | 5 +- .../tinkerpop/machine/functions/map/PathMap.java | 4 +- .../{CFunction.java => reduce/BasicReducer.java} | 23 +++-- .../{map/IncrMap.java => reduce/CountReduce.java} | 17 ++-- .../{CFunction.java => reduce/Reducer.java} | 13 +-- .../{Traverser.java => CompleteTraverser.java} | 28 +++--- .../CompleteTraverserFactory.java} | 15 ++- .../apache/tinkerpop/machine/traversers/Path.java | 3 +- .../tinkerpop/machine/traversers/Traverser.java | 67 +++++++------ .../TraverserFactory.java} | 11 +-- .../tinkerpop/machine/traversers/TraverserSet.java | 6 +- java/machine/beam/pom.xml | 103 ++++++++++++++++++++ .../org/apache/tinkerpop/machine/beam/Beam.java | 106 +++++++++++++++++++++ .../tinkerpop/machine/beam/BeamProcessor.java} | 14 ++- .../apache/tinkerpop/machine/beam/FilterFn.java} | 25 +++-- .../org/apache/tinkerpop/machine/beam/MapFn.java} | 18 ++-- .../apache/tinkerpop/machine/beam/OutputStep.java} | 18 +++- .../tinkerpop/machine/beam/TraverserCoder.java | 65 +++++++++++++ .../apache/tinkerpop/machine/beam/BeamTest.java} | 15 +-- .../tinkerpop/machine/pipes/AbstractStep.java | 2 +- .../apache/tinkerpop/machine/pipes/EmptyStep.java | 4 +- .../apache/tinkerpop/machine/pipes/FilterStep.java | 37 ++++++- .../tinkerpop/machine/pipes/InitialStep.java | 9 +- .../apache/tinkerpop/machine/pipes/MapStep.java | 5 +- .../org/apache/tinkerpop/machine/pipes/Pipes.java | 14 ++- .../apache/tinkerpop/machine/pipes/ReduceStep.java | 63 ++++++++++++ .../apache/tinkerpop/machine/pipes/PipesTest.java | 7 +- java/machine/pom.xml | 1 + java/pom.xml | 3 +- 46 files changed, 642 insertions(+), 183 deletions(-) diff --git a/java/core/src/main/java/org/apache/tinkerpop/language/Symbols.java b/java/core/src/main/java/org/apache/tinkerpop/language/Symbols.java index 681e26e..b302b88 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/language/Symbols.java +++ b/java/core/src/main/java/org/apache/tinkerpop/language/Symbols.java @@ -25,6 +25,7 @@ public final class Symbols { public static final String AS = "as"; public static final String C = "c"; + public static final String COUNT = "count"; public static final String IDENTITY = "identity"; public static final String IS = "is"; public static final String INCR = "incr"; diff --git a/java/core/src/main/java/org/apache/tinkerpop/language/Traversal.java b/java/core/src/main/java/org/apache/tinkerpop/language/Traversal.java index 7acdd75..f8f446c 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/language/Traversal.java +++ b/java/core/src/main/java/org/apache/tinkerpop/language/Traversal.java @@ -62,6 +62,11 @@ public class Traversal<C, S, E> implements Iterator<E> { return this; } + public Traversal<C, S, Long> count() { + this.bytecode.addInstruction(this.currentCoefficient, Symbols.COUNT); + return (Traversal) this; + } + public Traversal<C, S, E> identity() { this.bytecode.addInstruction(this.currentCoefficient, Symbols.IDENTITY); return this; diff --git a/java/core/src/main/java/org/apache/tinkerpop/language/TraversalSource.java b/java/core/src/main/java/org/apache/tinkerpop/language/TraversalSource.java index be6df43..baf4069 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/language/TraversalSource.java +++ b/java/core/src/main/java/org/apache/tinkerpop/language/TraversalSource.java @@ -18,6 +18,7 @@ */ package org.apache.tinkerpop.language; +import org.apache.tinkerpop.machine.bytecode.Bytecode; import org.apache.tinkerpop.machine.coefficients.Coefficient; import org.apache.tinkerpop.machine.coefficients.LongCoefficient; import org.apache.tinkerpop.machine.compiler.Strategy; @@ -31,6 +32,7 @@ import java.util.List; */ public class TraversalSource<C> { + private Bytecode<C> bytecode = new Bytecode<>(); private Coefficient<C> coefficient; private ProcessorFactory factory; private List<Strategy> strategies = new ArrayList<>(); diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/Bytecode.java b/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/Bytecode.java index 6b53da5..3310070 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/Bytecode.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/Bytecode.java @@ -19,6 +19,9 @@ package org.apache.tinkerpop.machine.bytecode; import org.apache.tinkerpop.machine.coefficients.Coefficient; +import org.apache.tinkerpop.machine.compiler.Strategy; +import org.apache.tinkerpop.machine.traversers.CompleteTraverserFactory; +import org.apache.tinkerpop.machine.traversers.TraverserFactory; import java.util.ArrayList; import java.util.List; @@ -26,14 +29,22 @@ import java.util.List; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public class Bytecode<C> { +public class Bytecode<C> implements Cloneable { - public List<Instruction<C>> instructions; + public List<Strategy> strategies = new ArrayList<>(); + public List<Instruction<C>> instructions = new ArrayList<>(); - public Bytecode() { - this.instructions = new ArrayList<>(); + + public void addStrategy(final Strategy strategy) { + this.strategies.add(strategy); + } + + public List<Strategy> getStrategies() { + return this.strategies; } + /// + public void addInstruction(final Coefficient<C> coefficient, final String op, final Object... args) { this.instructions.add(new Instruction<>(coefficient.clone(), op, args)); coefficient.unity(); @@ -51,8 +62,25 @@ public class Bytecode<C> { return this.instructions.get(this.instructions.size() - 1); } + // this should be part of processor! + public <S> TraverserFactory<C, S> getTraverserFactory() { + return new CompleteTraverserFactory<>(); + } + @Override public String toString() { return this.instructions.toString(); } + + @Override + public Bytecode<C> clone() { + try { + final Bytecode<C> clone = (Bytecode<C>) super.clone(); + clone.strategies = new ArrayList<>(this.strategies); + clone.instructions = new ArrayList<>(this.instructions); + return clone; + } catch (final CloneNotSupportedException e) { + throw new RuntimeException(e.getMessage(), e); + } + } } diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java b/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java index 8f560fb..2f29bba 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java @@ -27,6 +27,7 @@ import org.apache.tinkerpop.machine.functions.initial.InjectInitial; import org.apache.tinkerpop.machine.functions.map.IncrMap; import org.apache.tinkerpop.machine.functions.map.MapMap; import org.apache.tinkerpop.machine.functions.map.PathMap; +import org.apache.tinkerpop.machine.functions.reduce.CountReduce; import java.util.ArrayList; import java.util.List; @@ -60,6 +61,8 @@ public final class BytecodeUtil { final Coefficient<C> coefficient = instruction.coefficient(); final Set<String> labels = instruction.labels(); switch (op) { + case Symbols.COUNT: + return new CountReduce<>(coefficient, labels); case Symbols.IDENTITY: return new IdentityFilter<>(coefficient, labels); case Symbols.INJECT: diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/CFunction.java b/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/SourceInstruction.java similarity index 69% copy from java/core/src/main/java/org/apache/tinkerpop/machine/functions/CFunction.java copy to java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/SourceInstruction.java index 144b77f..48fc0f0 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/CFunction.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/SourceInstruction.java @@ -16,19 +16,26 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.tinkerpop.machine.functions; - -import org.apache.tinkerpop.machine.coefficients.Coefficient; - -import java.util.Set; +package org.apache.tinkerpop.machine.bytecode; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public interface CFunction<C> { +public final class SourceInstruction { + + private final String op; + private final Object[] args; - public Coefficient<C> coefficient(); + public SourceInstruction(final String op, final Object[] args) { + this.op = op; + this.args = args; + } - public Set<String> labels(); + public String op() { + return this.op; + } + public Object[] args() { + return this.args; + } } diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/coefficients/Coefficient.java b/java/core/src/main/java/org/apache/tinkerpop/machine/coefficients/Coefficient.java index bfad9d3..83c2ef4 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/coefficients/Coefficient.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/coefficients/Coefficient.java @@ -18,10 +18,12 @@ */ package org.apache.tinkerpop.machine.coefficients; +import java.io.Serializable; + /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public interface Coefficient<C> extends Cloneable { +public interface Coefficient<C> extends Cloneable, Serializable { public Coefficient<C> sum(final C other); diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/coefficients/LongCoefficient.java b/java/core/src/main/java/org/apache/tinkerpop/machine/coefficients/LongCoefficient.java index 8762553..f322383 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/coefficients/LongCoefficient.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/coefficients/LongCoefficient.java @@ -23,12 +23,15 @@ package org.apache.tinkerpop.machine.coefficients; */ public class LongCoefficient implements Coefficient<Long> { - private Long value; + protected Long value; private LongCoefficient(final Long value) { this.value = value; } + public LongCoefficient() { + } + @Override public Coefficient<Long> sum(final Long other) { this.value = this.value + other; diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/AbstractFunction.java b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/AbstractFunction.java index 5323696..d9d149b 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/AbstractFunction.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/AbstractFunction.java @@ -19,7 +19,6 @@ package org.apache.tinkerpop.machine.functions; import org.apache.tinkerpop.machine.coefficients.Coefficient; -import org.apache.tinkerpop.machine.traversers.Traverser; import org.apache.tinkerpop.util.StringFactory; import java.util.Set; @@ -29,7 +28,7 @@ import java.util.Set; */ public abstract class AbstractFunction<C, S, E> implements CFunction<C> { - protected final Coefficient<C> coefficient; + private Coefficient<C> coefficient; private Set<String> labels; public AbstractFunction(final Coefficient<C> coefficient, final Set<String> labels) { @@ -47,15 +46,6 @@ public abstract class AbstractFunction<C, S, E> implements CFunction<C> { return this.labels; } - protected Traverser<C, E> postProcess(final Traverser<C, E> traverser) { - traverser.coefficient().multiply(this.coefficient.value()); - for (final String label : this.labels) { - traverser.addLabel(label); - } - return traverser; - } - - @Override public String toString() { return StringFactory.makeFunctionString(this); diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/CFunction.java b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/CFunction.java index 144b77f..568c713 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/CFunction.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/CFunction.java @@ -20,12 +20,13 @@ package org.apache.tinkerpop.machine.functions; import org.apache.tinkerpop.machine.coefficients.Coefficient; +import java.io.Serializable; import java.util.Set; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public interface CFunction<C> { +public interface CFunction<C> extends Serializable { public Coefficient<C> coefficient(); diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/InitialFunction.java b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/InitialFunction.java index d44c47b..4174e16 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/InitialFunction.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/InitialFunction.java @@ -18,13 +18,11 @@ */ package org.apache.tinkerpop.machine.functions; -import org.apache.tinkerpop.machine.traversers.Traverser; - import java.util.Iterator; import java.util.function.Supplier; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public interface InitialFunction<C, S> extends Supplier<Iterator<Traverser<C, S>>>, CFunction<C> { +public interface InitialFunction<C, S> extends Supplier<Iterator<S>>, CFunction<C> { } diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/MapFunction.java b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/MapFunction.java index f5b5302..d7cb898 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/MapFunction.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/MapFunction.java @@ -20,11 +20,12 @@ package org.apache.tinkerpop.machine.functions; import org.apache.tinkerpop.machine.traversers.Traverser; +import java.io.Serializable; import java.util.function.Function; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public interface MapFunction<C, S, E> extends Function<Traverser<C, S>, Traverser<C, E>>, CFunction<C> { +public interface MapFunction<C, S, E> extends Function<Traverser<C, S>, E>, CFunction<C> { } diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/MapFunction.java b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/ReduceFunction.java similarity index 85% copy from java/core/src/main/java/org/apache/tinkerpop/machine/functions/MapFunction.java copy to java/core/src/main/java/org/apache/tinkerpop/machine/functions/ReduceFunction.java index f5b5302..c2ceb0d 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/MapFunction.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/ReduceFunction.java @@ -20,11 +20,13 @@ package org.apache.tinkerpop.machine.functions; import org.apache.tinkerpop.machine.traversers.Traverser; -import java.util.function.Function; +import java.util.function.BiFunction; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public interface MapFunction<C, S, E> extends Function<Traverser<C, S>, Traverser<C, E>>, CFunction<C> { +public interface ReduceFunction<C, S, E> extends BiFunction<Traverser<C, S>, E, E>, CFunction<C> { + public E getInitialValue(); } + diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/IdentityFilter.java b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/IdentityFilter.java index 544f674..e630242 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/IdentityFilter.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/IdentityFilter.java @@ -28,7 +28,7 @@ import java.util.Set; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public class IdentityFilter<C, S> extends AbstractFunction<C,S,S> implements FilterFunction<C, S> { +public class IdentityFilter<C, S> extends AbstractFunction<C, S, S> implements FilterFunction<C, S> { public IdentityFilter(final Coefficient<C> coefficient, final Set<String> labels) { super(coefficient, labels); @@ -36,7 +36,6 @@ public class IdentityFilter<C, S> extends AbstractFunction<C,S,S> implements Fil @Override public boolean test(final Traverser<C, S> traverser) { - super.postProcess(traverser); return true; } } diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/IsFilter.java b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/IsFilter.java index c0f2a98..cbdd8ed 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/IsFilter.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/IsFilter.java @@ -21,6 +21,7 @@ package org.apache.tinkerpop.machine.functions.filter; import org.apache.tinkerpop.machine.coefficients.Coefficient; import org.apache.tinkerpop.machine.functions.AbstractFunction; import org.apache.tinkerpop.machine.functions.FilterFunction; +import org.apache.tinkerpop.machine.traversers.CompleteTraverser; import org.apache.tinkerpop.machine.traversers.Traverser; import org.apache.tinkerpop.util.StringFactory; @@ -31,7 +32,7 @@ import java.util.Set; */ public class IsFilter<C, S> extends AbstractFunction<C, S, S> implements FilterFunction<C, S> { - private final S object; + private S object; public IsFilter(final Coefficient<C> coefficient, final Set<String> labels, final S object) { super(coefficient, labels); @@ -40,7 +41,6 @@ public class IsFilter<C, S> extends AbstractFunction<C, S, S> implements FilterF @Override public boolean test(final Traverser<C, S> traverser) { - super.postProcess(traverser); return traverser.object().equals(this.object); } diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/initial/InjectInitial.java b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/initial/InjectInitial.java index de93cc5..4c15206 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/initial/InjectInitial.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/initial/InjectInitial.java @@ -21,7 +21,6 @@ package org.apache.tinkerpop.machine.functions.initial; import org.apache.tinkerpop.machine.coefficients.Coefficient; import org.apache.tinkerpop.machine.functions.AbstractFunction; import org.apache.tinkerpop.machine.functions.InitialFunction; -import org.apache.tinkerpop.machine.traversers.Traverser; import org.apache.tinkerpop.util.StringFactory; import java.util.Iterator; @@ -41,8 +40,8 @@ public class InjectInitial<C, S> extends AbstractFunction<C, S, S> implements In } @Override - public Iterator<Traverser<C, S>> get() { - return Stream.of(this.objects).map(object -> new Traverser<>(this.coefficient.clone(), object)).iterator(); + public Iterator<S> get() { + return Stream.of(this.objects).iterator(); } @Override diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/IncrMap.java b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/IncrMap.java index 1e5964f..e1eff44 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/IncrMap.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/IncrMap.java @@ -35,7 +35,8 @@ public class IncrMap<C> extends AbstractFunction<C, Long, Long> implements MapFu } @Override - public Traverser<C, Long> apply(final Traverser<C, Long> traverser) { - return postProcess(traverser.split(traverser.object() + 1)); + public Long apply(final Traverser<C, Long> traverser) { + return traverser.object() + 1L; } + } diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/MapMap.java b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/MapMap.java index f3681b6..8237fbf 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/MapMap.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/MapMap.java @@ -24,6 +24,7 @@ import org.apache.tinkerpop.machine.functions.CFunction; import org.apache.tinkerpop.machine.functions.MapFunction; import org.apache.tinkerpop.machine.functions.NestedFunction; import org.apache.tinkerpop.machine.processor.Processor; +import org.apache.tinkerpop.machine.traversers.CompleteTraverser; import org.apache.tinkerpop.machine.traversers.Traverser; import org.apache.tinkerpop.util.StringFactory; @@ -44,10 +45,10 @@ public class MapMap<C, S, E> extends AbstractFunction<C, S, E> implements MapFun } @Override - public Traverser<C, E> apply(final Traverser<C, S> traverser) { + public E apply(final Traverser<C, S> traverser) { this.processor.reset(); this.processor.addStart(traverser); - return super.postProcess(traverser.split(this.processor.next().object())); + return this.processor.next().object(); } public void setProcessor(final Processor<C, S, E> processor) { diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/PathMap.java b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/PathMap.java index 051d647..4c955d2 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/PathMap.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/PathMap.java @@ -36,7 +36,7 @@ public class PathMap<C, S> extends AbstractFunction<C, S, Path> implements MapFu } @Override - public Traverser<C, Path> apply(final Traverser<C, S> traverser) { - return super.postProcess(traverser.split(traverser.path())); + public Path apply(final Traverser<C, S> traverser) { + return traverser.path(); } } diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/CFunction.java b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/BasicReducer.java similarity index 71% copy from java/core/src/main/java/org/apache/tinkerpop/machine/functions/CFunction.java copy to java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/BasicReducer.java index 144b77f..40c00c1 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/CFunction.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/BasicReducer.java @@ -16,19 +16,26 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.tinkerpop.machine.functions; - -import org.apache.tinkerpop.machine.coefficients.Coefficient; - -import java.util.Set; +package org.apache.tinkerpop.machine.functions.reduce; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public interface CFunction<C> { +public class BasicReducer<S> implements Reducer<S> { + + private S value; + + public BasicReducer(final S initialValue) { + this.value = initialValue; + } + + public S get() { + return this.value; + } - public Coefficient<C> coefficient(); + public void update(final S newValue) { + this.value = newValue; + } - public Set<String> labels(); } diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/IncrMap.java b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/CountReduce.java similarity index 68% copy from java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/IncrMap.java copy to java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/CountReduce.java index 1e5964f..8c674c3 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/IncrMap.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/CountReduce.java @@ -16,11 +16,11 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.tinkerpop.machine.functions.map; +package org.apache.tinkerpop.machine.functions.reduce; import org.apache.tinkerpop.machine.coefficients.Coefficient; import org.apache.tinkerpop.machine.functions.AbstractFunction; -import org.apache.tinkerpop.machine.functions.MapFunction; +import org.apache.tinkerpop.machine.functions.ReduceFunction; import org.apache.tinkerpop.machine.traversers.Traverser; import java.util.Set; @@ -28,14 +28,19 @@ import java.util.Set; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public class IncrMap<C> extends AbstractFunction<C, Long, Long> implements MapFunction<C, Long, Long> { +public class CountReduce<C, S> extends AbstractFunction<C, S, Long> implements ReduceFunction<C, S, Long> { - public IncrMap(final Coefficient<C> coefficient, final Set<String> labels) { + public CountReduce(final Coefficient<C> coefficient, final Set<String> labels) { super(coefficient, labels); } @Override - public Traverser<C, Long> apply(final Traverser<C, Long> traverser) { - return postProcess(traverser.split(traverser.object() + 1)); + public Long apply(final Traverser<C, S> traverser, final Long currentValue) { + return currentValue + traverser.coefficient().count(); + } + + @Override + public Long getInitialValue() { + return 0L; } } diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/CFunction.java b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/Reducer.java similarity index 78% copy from java/core/src/main/java/org/apache/tinkerpop/machine/functions/CFunction.java copy to java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/Reducer.java index 144b77f..8434ca7 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/CFunction.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/Reducer.java @@ -16,19 +16,14 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.tinkerpop.machine.functions; - -import org.apache.tinkerpop.machine.coefficients.Coefficient; - -import java.util.Set; +package org.apache.tinkerpop.machine.functions.reduce; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public interface CFunction<C> { - - public Coefficient<C> coefficient(); +public interface Reducer<S> { - public Set<String> labels(); + public S get(); + public void update(final S newValue); } diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/Traverser.java b/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/CompleteTraverser.java similarity index 70% copy from java/core/src/main/java/org/apache/tinkerpop/machine/traversers/Traverser.java copy to java/core/src/main/java/org/apache/tinkerpop/machine/traversers/CompleteTraverser.java index 4d2e92a..81d17fc 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/Traverser.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/CompleteTraverser.java @@ -20,19 +20,23 @@ package org.apache.tinkerpop.machine.traversers; import org.apache.tinkerpop.machine.coefficients.Coefficient; +import java.io.Serializable; import java.util.Collections; -import java.util.HashSet; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public class Traverser<C, S> { +public class CompleteTraverser<C, S> implements Traverser<C, S> { - private final Coefficient<C> coefficient; - private final S object; + private Coefficient<C> coefficient; + private S object; private Path path = new Path(); - public Traverser(final Coefficient<C> coefficient, final S object) { + public CompleteTraverser() { + + } + + public CompleteTraverser(final Coefficient<C> coefficient, final S object) { this.coefficient = coefficient; this.object = object; } @@ -53,20 +57,22 @@ public class Traverser<C, S> { this.path.addLabels(Collections.singleton(label)); } - public <B> Traverser<C, B> split(final B object) { - final Traverser<C, B> traverser = new Traverser<>(this.coefficient.clone(), object); - traverser.path = new Path(this.path); - traverser.path.add(new HashSet<>(), object); - return traverser; + @Override + public <E> Traverser<C, E> split(final Coefficient<C> eCoefficient, final E eObject) { + final CompleteTraverser<C, E> clone = new CompleteTraverser<>(eCoefficient, eObject); + clone.path = new Path(this.path); + return clone; } @Override public boolean equals(final Object other) { - return other instanceof Traverser && ((Traverser<C, S>) other).object.equals(this.object); + return other instanceof CompleteTraverser && ((CompleteTraverser<C, S>) other).object.equals(this.object); } @Override public String toString() { return this.object.toString(); } + + } diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/CFunction.java b/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/CompleteTraverserFactory.java similarity index 75% copy from java/core/src/main/java/org/apache/tinkerpop/machine/functions/CFunction.java copy to java/core/src/main/java/org/apache/tinkerpop/machine/traversers/CompleteTraverserFactory.java index 144b77f..7d9356f 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/CFunction.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/CompleteTraverserFactory.java @@ -16,19 +16,16 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.tinkerpop.machine.functions; +package org.apache.tinkerpop.machine.traversers; import org.apache.tinkerpop.machine.coefficients.Coefficient; -import java.util.Set; - /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public interface CFunction<C> { - - public Coefficient<C> coefficient(); - - public Set<String> labels(); - +public class CompleteTraverserFactory<C, S> implements TraverserFactory<C, S> { + @Override + public Traverser<C, S> create(final Coefficient<C> coefficient, final S object) { + return new CompleteTraverser<>(coefficient.clone(), object); + } } diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/Path.java b/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/Path.java index d318971..7509824 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/Path.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/Path.java @@ -18,6 +18,7 @@ */ package org.apache.tinkerpop.machine.traversers; +import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.Set; @@ -25,7 +26,7 @@ import java.util.Set; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public class Path { +public class Path implements Serializable { private final List<Object> objects = new ArrayList<>(); private final List<Set<String>> labels = new ArrayList<>(); diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/Traverser.java b/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/Traverser.java index 4d2e92a..27c9f92 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/Traverser.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/Traverser.java @@ -19,54 +19,63 @@ package org.apache.tinkerpop.machine.traversers; import org.apache.tinkerpop.machine.coefficients.Coefficient; +import org.apache.tinkerpop.machine.functions.FilterFunction; +import org.apache.tinkerpop.machine.functions.FlatMapFunction; +import org.apache.tinkerpop.machine.functions.MapFunction; +import org.apache.tinkerpop.machine.functions.reduce.Reducer; +import java.io.Serializable; import java.util.Collections; -import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public class Traverser<C, S> { +public interface Traverser<C, S> extends Serializable { - private final Coefficient<C> coefficient; - private final S object; - private Path path = new Path(); + public Coefficient<C> coefficient(); - public Traverser(final Coefficient<C> coefficient, final S object) { - this.coefficient = coefficient; - this.object = object; - } + public S object(); - public Coefficient<C> coefficient() { - return this.coefficient; - } + public Path path(); - public S object() { - return this.object; - } + public void addLabel(final String label); - public Path path() { - return this.path; + public default void addLabels(final Set<String> labels) { + for (final String label : labels) { + this.addLabel(label); + } } - public void addLabel(final String label) { - this.path.addLabels(Collections.singleton(label)); + public default boolean filter(final FilterFunction<C, S> function) { + if (function.test(this)) { + this.coefficient().multiply(function.coefficient().value()); + this.addLabels(function.labels()); + return true; + } else { + return false; + } } - public <B> Traverser<C, B> split(final B object) { - final Traverser<C, B> traverser = new Traverser<>(this.coefficient.clone(), object); - traverser.path = new Path(this.path); - traverser.path.add(new HashSet<>(), object); + public default <E> Traverser<C, E> map(final MapFunction<C, S, E> function) { + final Coefficient<C> eCoefficient = this.coefficient().clone().multiply(function.coefficient().value()); + final E eObject = function.apply(this); + final Traverser<C, E> traverser = this.split(eCoefficient, eObject); + traverser.addLabels(function.labels()); return traverser; } - @Override - public boolean equals(final Object other) { - return other instanceof Traverser && ((Traverser<C, S>) other).object.equals(this.object); + public default <E> Iterator<Traverser<C, E>> flatMap(final FlatMapFunction<C, S, E> function) { + return Collections.emptyIterator(); } - @Override - public String toString() { - return this.object.toString(); + //public default void sideEffect(final SideEffectFunction<C,S> function); + + public default <E> Traverser<C, E> reduce(final Reducer<E> reducer) { + final Traverser<C, E> traverser = this.split(this.coefficient().clone().unity(), reducer.get()); + return traverser; } + + public <E> Traverser<C, E> split(final Coefficient<C> coefficient, final E object); } diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/CFunction.java b/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/TraverserFactory.java similarity index 83% copy from java/core/src/main/java/org/apache/tinkerpop/machine/functions/CFunction.java copy to java/core/src/main/java/org/apache/tinkerpop/machine/traversers/TraverserFactory.java index 144b77f..22f2911 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/CFunction.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/TraverserFactory.java @@ -16,19 +16,14 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.tinkerpop.machine.functions; +package org.apache.tinkerpop.machine.traversers; import org.apache.tinkerpop.machine.coefficients.Coefficient; -import java.util.Set; - /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public interface CFunction<C> { - - public Coefficient<C> coefficient(); - - public Set<String> labels(); +public interface TraverserFactory<C, S> { + public Traverser<C, S> create(final Coefficient<C> coefficient, final S object); } diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/TraverserSet.java b/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/TraverserSet.java index a7c011b..ce52b63 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/TraverserSet.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/TraverserSet.java @@ -140,8 +140,8 @@ public class TraverserSet<C, S> extends AbstractSet<Traverser<C, S>> implements return this.map.values().toString(); } - /*public void sort(final Comparator<Traverser<S>> comparator) { - final List<Traverser<C, S>> list = new ArrayList<>(this.map.size()); + /*public void sort(final Comparator<CompleteTraverser<S>> comparator) { + final List<CompleteTraverser<C, S>> list = new ArrayList<>(this.map.size()); IteratorUtils.removeOnNext(this.map.values().iterator()).forEachRemaining(list::add); Collections.sort(list, comparator); this.map.reset(); @@ -149,7 +149,7 @@ public class TraverserSet<C, S> extends AbstractSet<Traverser<C, S>> implements } public void shuffle() { - final List<Traverser<C, S>> list = new ArrayList<>(this.map.size()); + final List<CompleteTraverser<C, S>> list = new ArrayList<>(this.map.size()); IteratorUtils.removeOnNext(this.map.values().iterator()).forEachRemaining(list::add); Collections.shuffle(list); this.map.reset(); diff --git a/java/machine/beam/pom.xml b/java/machine/beam/pom.xml new file mode 100644 index 0000000..2fb4c82 --- /dev/null +++ b/java/machine/beam/pom.xml @@ -0,0 +1,103 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <artifactId>machine</artifactId> + <groupId>org.apache.tinkerpop</groupId> + <version>4.0.0-SNAPSHOT</version> + </parent> + <name>Apache TinkerPop (Java) :: Machine :: Beam</name> + <artifactId>beam</artifactId> + <dependencies> + <dependency> + <groupId>org.apache.tinkerpop</groupId> + <artifactId>core</artifactId> + <version>4.0.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-core</artifactId> + <version>2.11.0</version> + <!-- APACHE BEAM DOES NOT USE ENFORCER PLUGIN --> + <exclusions> + <exclusion> + <groupId>org.tukaani</groupId> + <artifactId>xz</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </exclusion> + <exclusion> + <groupId>org.xerial.snappy</groupId> + <artifactId>snappy-java</artifactId> + </exclusion> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-annotations</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-annotations</artifactId> + <version>2.9.8</version> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>1.7.25</version> + </dependency> + <dependency> + <groupId>org.xerial.snappy</groupId> + <artifactId>snappy-java</artifactId> + <version>1.1.1.7</version> + </dependency> + <dependency> + <groupId>org.tukaani</groupId> + <artifactId>xz</artifactId> + <version>1.8</version> + </dependency> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-direct-java</artifactId> + <version>2.11.0</version> + <scope>runtime</scope> + </dependency> + </dependencies> + <build> + <directory>${basedir}/target</directory> + <finalName>${project.artifactId}-${project.version}</finalName> + <testResources> + <testResource> + <directory>${basedir}/src/test/resources + </directory> + </testResource> + </testResources> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-failsafe-plugin</artifactId> + </plugin> + </plugins> + </build> +</project> \ No newline at end of file diff --git a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java new file mode 100644 index 0000000..a03ee71 --- /dev/null +++ b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.machine.beam; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.tinkerpop.machine.bytecode.Bytecode; +import org.apache.tinkerpop.machine.bytecode.BytecodeUtil; +import org.apache.tinkerpop.machine.coefficients.LongCoefficient; +import org.apache.tinkerpop.machine.functions.CFunction; +import org.apache.tinkerpop.machine.functions.FilterFunction; +import org.apache.tinkerpop.machine.functions.MapFunction; +import org.apache.tinkerpop.machine.functions.initial.InjectInitial; +import org.apache.tinkerpop.machine.processor.Processor; +import org.apache.tinkerpop.machine.traversers.CompleteTraverser; +import org.apache.tinkerpop.machine.traversers.Traverser; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public class Beam<C, S, E> implements Processor<C, S, E> { + + final Pipeline pipeline; + PCollection collection; + public static List<Traverser> OUTPUT = new ArrayList<>(); + Iterator<Traverser> iterator = null; + + public Beam(final Bytecode<C> bytecode) { + this.pipeline = Pipeline.create(); + this.pipeline.getCoderRegistry().registerCoderForClass(Traverser.class, new TraverserCoder<>()); + + for (final CFunction<?> function : BytecodeUtil.compile(bytecode)) { + if (function instanceof InjectInitial) { + final List<Traverser<C, S>> objects = new ArrayList<>(); + final Iterator<S> iterator = ((InjectInitial) function).get(); + while (iterator.hasNext()) + objects.add(new CompleteTraverser(LongCoefficient.create(), iterator.next())); + this.collection = this.pipeline.apply(Create.of(objects).withCoder(new TraverserCoder<>())); + } else if (function instanceof FilterFunction) { + collection = (PCollection) collection.apply(ParDo.of(new FilterFn<>((FilterFunction<C, S>) function))); + collection.setCoder(new TraverserCoder()); + } else if (function instanceof MapFunction) { + collection = (PCollection) collection.apply(ParDo.of(new MapFn<>((MapFunction<C, S, E>) function))); + collection.setCoder(new TraverserCoder()); + } else + throw new RuntimeException("You need a new step type:" + function); + } + collection = (PCollection) collection.apply(ParDo.of(new OutputStep())); + + } + + @Override + public void addStart(Traverser<C, S> traverser) { + + } + + @Override + public Traverser<C, E> next() { + if (null == this.iterator) { + pipeline.run().waitUntilFinish(); + this.iterator = OUTPUT.iterator(); + } + return this.iterator.next(); + } + + @Override + public boolean hasNext() { + if (null == this.iterator) { + pipeline.run().waitUntilFinish(); + this.iterator = OUTPUT.iterator(); + } + return this.iterator.hasNext(); + } + + @Override + public void reset() { + + } + + @Override + public String toString() { + return this.pipeline.toString(); + } +} diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/MapFunction.java b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/BeamProcessor.java similarity index 68% copy from java/core/src/main/java/org/apache/tinkerpop/machine/functions/MapFunction.java copy to java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/BeamProcessor.java index f5b5302..7da3f47 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/MapFunction.java +++ b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/BeamProcessor.java @@ -16,15 +16,19 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.tinkerpop.machine.functions; +package org.apache.tinkerpop.machine.beam; -import org.apache.tinkerpop.machine.traversers.Traverser; - -import java.util.function.Function; +import org.apache.tinkerpop.machine.bytecode.Bytecode; +import org.apache.tinkerpop.machine.processor.Processor; +import org.apache.tinkerpop.machine.processor.ProcessorFactory; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public interface MapFunction<C, S, E> extends Function<Traverser<C, S>, Traverser<C, E>>, CFunction<C> { +public class BeamProcessor implements ProcessorFactory { + @Override + public <C, S, E> Processor<C, S, E> mint(final Bytecode<C> bytecode) { + return new Beam<>(bytecode); + } } diff --git a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/FilterStep.java b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/FilterFn.java similarity index 64% copy from java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/FilterStep.java copy to java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/FilterFn.java index eb1d1e2..a3d1f9d 100644 --- a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/FilterStep.java +++ b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/FilterFn.java @@ -16,27 +16,26 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.tinkerpop.machine.pipes; +package org.apache.tinkerpop.machine.beam; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.tinkerpop.machine.functions.FilterFunction; import org.apache.tinkerpop.machine.traversers.Traverser; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public class FilterStep<C, S> extends AbstractStep<C, S, S> { +public class FilterFn<C, S> extends DoFn<Traverser<C, S>, Traverser<C, S>> { - public FilterStep(final AbstractStep<C, ?, S> previousStep, final FilterFunction<C, S> filterFunction) { - super(previousStep, filterFunction); + private FilterFunction<C, S> filterFunction; + + public FilterFn(final FilterFunction<C, S> filterFunction) { + this.filterFunction = filterFunction; } - @Override - public Traverser<C, S> next() { - Traverser<C, S> traverser; - while (true) { - traverser = this.processNextTraverser(); - if (((FilterFunction<C, S>) this.function).test(traverser)) - return traverser; - } + @ProcessElement + public void processElement(final @Element Traverser<C, S> traverser, final OutputReceiver<Traverser<C, S>> output) { + if (traverser.filter(this.filterFunction)) + output.output(traverser); } -} +} \ No newline at end of file diff --git a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/MapStep.java b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/MapFn.java similarity index 64% copy from java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/MapStep.java copy to java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/MapFn.java index 1216491..2b83ab2 100644 --- a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/MapStep.java +++ b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/MapFn.java @@ -16,22 +16,26 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.tinkerpop.machine.pipes; +package org.apache.tinkerpop.machine.beam; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.tinkerpop.machine.functions.MapFunction; import org.apache.tinkerpop.machine.traversers.Traverser; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public class MapStep<C, S, E> extends AbstractStep<C, S, E> { +public class MapFn<C, S, E> extends DoFn<Traverser<C, S>, Traverser<C, E>> { - public MapStep(final AbstractStep<C, ?, S> previousStep, final MapFunction<C, S, E> mapFunction) { - super(previousStep, mapFunction); + private final MapFunction<C, S, E> mapFunction; + + public MapFn(final MapFunction<C, S, E> mapFunction) { + this.mapFunction = mapFunction; } - @Override - public Traverser<C, E> next() { - return ((MapFunction<C, S, E>) this.function).apply(super.processNextTraverser()); + @ProcessElement + public void processElement(final @Element Traverser<C, S> traverser, final OutputReceiver<Traverser<C, E>> output) { + output.output(traverser.map(this.mapFunction)); } } diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/MapFunction.java b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/OutputStep.java similarity index 71% copy from java/core/src/main/java/org/apache/tinkerpop/machine/functions/MapFunction.java copy to java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/OutputStep.java index f5b5302..8dddaea 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/MapFunction.java +++ b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/OutputStep.java @@ -16,15 +16,23 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.tinkerpop.machine.functions; +package org.apache.tinkerpop.machine.beam; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.tinkerpop.machine.traversers.Traverser; -import java.util.function.Function; - /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public interface MapFunction<C, S, E> extends Function<Traverser<C, S>, Traverser<C, E>>, CFunction<C> { +public class OutputStep<C, S> extends DoFn<Traverser<C, S>, String> { + + + public OutputStep() { + + } -} + @ProcessElement + public void processElement(final @Element Traverser<C, S> traverser, final OutputReceiver<String> output) { + Beam.OUTPUT.add(traverser); + } +} \ No newline at end of file diff --git a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/TraverserCoder.java b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/TraverserCoder.java new file mode 100644 index 0000000..f35cdd1 --- /dev/null +++ b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/TraverserCoder.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.machine.beam; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.tinkerpop.machine.coefficients.LongCoefficient; +import org.apache.tinkerpop.machine.traversers.CompleteTraverser; +import org.apache.tinkerpop.machine.traversers.Traverser; + +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.OutputStream; +import java.util.Collections; +import java.util.List; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public class TraverserCoder<C, S> extends Coder<Traverser<C, S>> { + + @Override + public void encode(final Traverser<C, S> value, final OutputStream outStream) throws CoderException, IOException { + ObjectOutputStream outputStream = new ObjectOutputStream(outStream); + outputStream.writeObject(value.object()); + } + + @Override + public Traverser<C, S> decode(InputStream inStream) throws CoderException, IOException { + try { + ObjectInputStream inputStream = new ObjectInputStream(inStream); + return new CompleteTraverser(LongCoefficient.create(), inputStream.readObject()); + } catch (final ClassNotFoundException e) { + throw new IOException(e.getMessage(), e); + } + } + + @Override + public List<? extends Coder<?>> getCoderArguments() { + return Collections.emptyList(); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + + } +} diff --git a/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java b/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java similarity index 74% copy from java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java copy to java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java index 38d0df1..1a84253 100644 --- a/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java +++ b/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java @@ -16,27 +16,30 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.tinkerpop.machine.pipes; +package org.apache.tinkerpop.machine.beam; import org.apache.tinkerpop.language.Gremlin; import org.apache.tinkerpop.language.Traversal; import org.apache.tinkerpop.language.TraversalSource; import org.apache.tinkerpop.language.TraversalUtil; -import org.apache.tinkerpop.language.__; import org.apache.tinkerpop.machine.coefficients.LongCoefficient; import org.junit.jupiter.api.Test; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public class PipesTest { - +public class BeamTest { @Test public void shouldWork() { final TraversalSource<Long> g = Gremlin.<Long>traversal() .coefficient(LongCoefficient.create()) - .processor(PipesProcessor.class); - final Traversal<Long, Long, Long> traversal = g.inject(7L, 10L, 12L).as("a").c(3L).map(__.incr()).identity().incr(); + .processor(BeamProcessor.class); + Traversal<Long, Long, Long> traversal = g.inject(7L, 10L, 12L).as("a").c(3L).identity().incr(); + System.out.println(TraversalUtil.getBytecode(traversal)); + System.out.println(traversal); + System.out.println(traversal.toList()); + System.out.println("\n----------\n"); + traversal = g.inject(7L, 10L, 12L).as("a").c(3L).identity().incr().is(44L);//.count(); System.out.println(TraversalUtil.getBytecode(traversal)); System.out.println(traversal); System.out.println(traversal.toList()); diff --git a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/AbstractStep.java b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/AbstractStep.java index 3136c38..ac6f73e 100644 --- a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/AbstractStep.java +++ b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/AbstractStep.java @@ -48,7 +48,7 @@ public abstract class AbstractStep<C, S, E> implements Step<C, S, E> { @Override public abstract Traverser<C, E> next(); - protected Traverser<C, S> processNextTraverser() { + protected Traverser<C, S> getPreviousTraverser() { if (!this.traverserSet.isEmpty()) return this.traverserSet.remove(); else diff --git a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/EmptyStep.java b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/EmptyStep.java index c1dff07..a93ea13 100644 --- a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/EmptyStep.java +++ b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/EmptyStep.java @@ -18,7 +18,7 @@ */ package org.apache.tinkerpop.machine.pipes; -import org.apache.tinkerpop.machine.traversers.Traverser; +import org.apache.tinkerpop.machine.traversers.CompleteTraverser; import org.apache.tinkerpop.util.FastNoSuchElementException; /** @@ -38,7 +38,7 @@ public final class EmptyStep<C, S, E> extends AbstractStep<C, S, E> { } @Override - public Traverser<C, E> next() { + public CompleteTraverser<C, E> next() { throw FastNoSuchElementException.instance(); } diff --git a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/FilterStep.java b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/FilterStep.java index eb1d1e2..aeee847 100644 --- a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/FilterStep.java +++ b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/FilterStep.java @@ -26,17 +26,44 @@ import org.apache.tinkerpop.machine.traversers.Traverser; */ public class FilterStep<C, S> extends AbstractStep<C, S, S> { + private final FilterFunction<C, S> filterFunction; + private Traverser<C, S> nextTraverser = null; + public FilterStep(final AbstractStep<C, ?, S> previousStep, final FilterFunction<C, S> filterFunction) { super(previousStep, filterFunction); + this.filterFunction = filterFunction; } @Override public Traverser<C, S> next() { - Traverser<C, S> traverser; - while (true) { - traverser = this.processNextTraverser(); - if (((FilterFunction<C, S>) this.function).test(traverser)) - return traverser; + if (null != this.nextTraverser) { + final Traverser<C, S> traverser = this.nextTraverser; + this.nextTraverser = null; + return traverser; + } else { + Traverser<C, S> traverser; + while (true) { + traverser = this.getPreviousTraverser(); + if (traverser.filter(this.filterFunction)) + return traverser; + } + } + } + + @Override + public boolean hasNext() { + if (null != this.nextTraverser) + return true; + else { + Traverser<C, S> traverser; + while (super.hasNext()) { + traverser = this.getPreviousTraverser(); + if (traverser.filter(this.filterFunction)) { + this.nextTraverser = traverser; + return true; + } + } + return false; } } } diff --git a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/InitialStep.java b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/InitialStep.java index 31d3c3e..ef4e075 100644 --- a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/InitialStep.java +++ b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/InitialStep.java @@ -20,6 +20,7 @@ package org.apache.tinkerpop.machine.pipes; import org.apache.tinkerpop.machine.functions.InitialFunction; import org.apache.tinkerpop.machine.traversers.Traverser; +import org.apache.tinkerpop.machine.traversers.TraverserFactory; import java.util.Collections; import java.util.Iterator; @@ -29,11 +30,13 @@ import java.util.Iterator; */ public class InitialStep<C, S> extends AbstractStep<C, S, S> { - private Iterator<Traverser<C, S>> objects; + private Iterator<S> objects; + private final TraverserFactory<C, S> traverserFactory; - public InitialStep(final InitialFunction<C, S> initialFunction) { + public InitialStep(final InitialFunction<C, S> initialFunction, final TraverserFactory<C, S> traverserFactory) { super(EmptyStep.instance(), initialFunction); this.objects = initialFunction.get(); + this.traverserFactory = traverserFactory; } @Override @@ -43,7 +46,7 @@ public class InitialStep<C, S> extends AbstractStep<C, S, S> { @Override public Traverser<C, S> next() { - return this.objects.next(); + return this.traverserFactory.create(this.function.coefficient(), this.objects.next()); } @Override diff --git a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/MapStep.java b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/MapStep.java index 1216491..fa8b49f 100644 --- a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/MapStep.java +++ b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/MapStep.java @@ -26,12 +26,15 @@ import org.apache.tinkerpop.machine.traversers.Traverser; */ public class MapStep<C, S, E> extends AbstractStep<C, S, E> { + private final MapFunction<C, S, E> mapFunction; + public MapStep(final AbstractStep<C, ?, S> previousStep, final MapFunction<C, S, E> mapFunction) { super(previousStep, mapFunction); + this.mapFunction = mapFunction; } @Override public Traverser<C, E> next() { - return ((MapFunction<C, S, E>) this.function).apply(super.processNextTraverser()); + return super.getPreviousTraverser().map(this.mapFunction); } } diff --git a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java index bc5028d..b38ecd4 100644 --- a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java +++ b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java @@ -25,8 +25,11 @@ import org.apache.tinkerpop.machine.functions.FilterFunction; import org.apache.tinkerpop.machine.functions.InitialFunction; import org.apache.tinkerpop.machine.functions.MapFunction; import org.apache.tinkerpop.machine.functions.NestedFunction; +import org.apache.tinkerpop.machine.functions.ReduceFunction; +import org.apache.tinkerpop.machine.functions.reduce.BasicReducer; import org.apache.tinkerpop.machine.processor.Processor; import org.apache.tinkerpop.machine.traversers.Traverser; +import org.apache.tinkerpop.machine.traversers.TraverserFactory; import java.util.ArrayList; import java.util.List; @@ -40,18 +43,21 @@ public class Pipes<C, S, E> implements Processor<C, S, E> { private Step<C, ?, E> endStep; private Step<C, S, ?> startStep = EmptyStep.instance(); - private Pipes(final List<CFunction<C>> functions) { + private Pipes(final List<CFunction<C>> functions, final TraverserFactory<C, S> traverserFactory) { AbstractStep<C, ?, ?> previousStep = EmptyStep.instance(); for (final CFunction<?> function : functions) { if (function instanceof NestedFunction) - ((NestedFunction<C, ?, ?>) function).setProcessor(new Pipes<>(((NestedFunction<C, ?, ?>) function).getFunctions())); + ((NestedFunction<C, ?, ?>) function).setProcessor(new Pipes(((NestedFunction<C, S, E>) function).getFunctions(), traverserFactory)); final AbstractStep nextStep; if (function instanceof FilterFunction) nextStep = new FilterStep(previousStep, (FilterFunction<C, ?>) function); else if (function instanceof MapFunction) nextStep = new MapStep(previousStep, (MapFunction<C, ?, ?>) function); else if (function instanceof InitialFunction) - nextStep = new InitialStep<>((InitialFunction<C, S>) function); + // TODO: traverser factory + nextStep = new InitialStep((InitialFunction<C, S>) function, traverserFactory); + else if (function instanceof ReduceFunction) + nextStep = new ReduceStep(previousStep, (ReduceFunction<C, ?, ?>) function, new BasicReducer<>(((ReduceFunction<C, ?, ?>) function).getInitialValue()), traverserFactory); else throw new RuntimeException("You need a new step type:" + function); @@ -65,7 +71,7 @@ public class Pipes<C, S, E> implements Processor<C, S, E> { } public Pipes(final Bytecode<C> bytecode) { - this(BytecodeUtil.compile(BytecodeUtil.optimize(bytecode))); + this(BytecodeUtil.compile(BytecodeUtil.optimize(bytecode)), bytecode.getTraverserFactory()); } @Override diff --git a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/ReduceStep.java b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/ReduceStep.java new file mode 100644 index 0000000..574f2b9 --- /dev/null +++ b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/ReduceStep.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.machine.pipes; + +import org.apache.tinkerpop.machine.functions.ReduceFunction; +import org.apache.tinkerpop.machine.functions.reduce.Reducer; +import org.apache.tinkerpop.machine.traversers.Traverser; +import org.apache.tinkerpop.machine.traversers.TraverserFactory; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public class ReduceStep<C, S, E> extends AbstractStep<C, S, E> { + + private final ReduceFunction<C, S, E> reduceFunction; + private final Reducer<E> reducer; + private final TraverserFactory<C, E> traverserFactory; + private boolean done = false; + + public ReduceStep(final AbstractStep<C, ?, S> previousStep, + final ReduceFunction<C, S, E> reduceFunction, + final Reducer<E> reducer, + final TraverserFactory<C, E> traverserFactory) { + super(previousStep, reduceFunction); + this.reduceFunction = reduceFunction; + this.reducer = reducer; + this.traverserFactory = traverserFactory; + } + + @Override + public Traverser<C, E> next() { + this.done = true; + Traverser<C, S> traverser = null; + while (this.hasNext()) { + traverser = getPreviousTraverser(); + this.reducer.update(this.reduceFunction.apply(traverser, this.reducer.get())); + } + return null == traverser ? + this.traverserFactory.create(this.function.coefficient(), this.reduceFunction.getInitialValue()) : + traverser.reduce(this.reducer); + } + + @Override + public boolean hasNext() { + return !this.done; + } +} diff --git a/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java b/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java index 38d0df1..8a7fd48 100644 --- a/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java +++ b/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java @@ -36,7 +36,12 @@ public class PipesTest { final TraversalSource<Long> g = Gremlin.<Long>traversal() .coefficient(LongCoefficient.create()) .processor(PipesProcessor.class); - final Traversal<Long, Long, Long> traversal = g.inject(7L, 10L, 12L).as("a").c(3L).map(__.incr()).identity().incr(); + Traversal<Long, Long, Long> traversal = g.inject(7L, 10L, 12L).as("a").c(3L).map(__.incr()).identity().incr(); + System.out.println(TraversalUtil.getBytecode(traversal)); + System.out.println(traversal); + System.out.println(traversal.toList()); + System.out.println("\n----------\n"); + traversal = g.inject(7L, 10L, 12L).as("a").c(3L).map(__.incr()).identity().incr().is(44L).count(); System.out.println(TraversalUtil.getBytecode(traversal)); System.out.println(traversal); System.out.println(traversal.toList()); diff --git a/java/machine/pom.xml b/java/machine/pom.xml index 8aef974..8fa7e4f 100644 --- a/java/machine/pom.xml +++ b/java/machine/pom.xml @@ -26,5 +26,6 @@ limitations under the License. <artifactId>machine</artifactId> <modules> <module>pipes</module> + <module>beam</module> </modules> </project> \ No newline at end of file diff --git a/java/pom.xml b/java/pom.xml index 6ee5dd1..3887958 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -170,7 +170,8 @@ limitations under the License. <artifactId>maven-surefire-plugin</artifactId> <version>2.22.1</version> <configuration> - <argLine>-Dlog4j.configuration=${log4j-test.properties} -Dbuild.dir=${project.build.directory} -Dis.testing=true + <argLine>-Dlog4j.configuration=${log4j-test.properties} -Dbuild.dir=${project.build.directory} + -Dis.testing=true </argLine> <excludes> <exclude>**/*IntegrateTest.java</exclude>