This is an automated email from the ASF dual-hosted git repository. okram pushed a commit to branch tp4 in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit 4ee2c36b57d1f1fdb888d6c32ffdf5c3fedb4ac1 Author: Marko A. Rodriguez <okramma...@gmail.com> AuthorDate: Mon Mar 11 09:15:37 2019 -0600 have ReducerFn working in Apache Beam. Now we have count(). This means that both Pipes and Beam machines are at the same level of completition. --- .../tinkerpop/machine/bytecode/Bytecode.java | 6 +- .../machine/functions/ReduceFunction.java | 2 + .../machine/functions/reduce/CountReduce.java | 5 ++ .../machine/functions/reduce/Reducer.java | 4 +- .../traversers/CompleteTraverserFactory.java | 5 +- .../machine/traversers/TraverserFactory.java | 6 +- .../tinkerpop/machine/beam/BasicAccumulator.java} | 51 ++++++++-------- .../org/apache/tinkerpop/machine/beam/Beam.java | 20 +++++-- .../java/org/apache/tinkerpop/machine/beam/Fn.java | 2 + .../apache/tinkerpop/machine/beam/ReduceFn.java | 68 +++++++++++++++++++--- .../tinkerpop/machine/beam/ReducerCoder.java | 62 ++++++++++++++++++++ .../apache/tinkerpop/machine/beam/BeamTest.java | 2 +- .../tinkerpop/machine/pipes/InitialStep.java | 4 +- .../org/apache/tinkerpop/machine/pipes/Pipes.java | 2 +- .../apache/tinkerpop/machine/pipes/ReduceStep.java | 4 +- 15 files changed, 191 insertions(+), 52 deletions(-) diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/Bytecode.java b/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/Bytecode.java index 3310070..74d4d3e 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/Bytecode.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/Bytecode.java @@ -31,8 +31,8 @@ import java.util.List; */ public class Bytecode<C> implements Cloneable { - public List<Strategy> strategies = new ArrayList<>(); - public List<Instruction<C>> instructions = new ArrayList<>(); + private List<Strategy> strategies = new ArrayList<>(); + private List<Instruction<C>> instructions = new ArrayList<>(); public void addStrategy(final Strategy strategy) { @@ -63,7 +63,7 @@ public class Bytecode<C> implements Cloneable { } // this should be part of processor! - public <S> TraverserFactory<C, S> getTraverserFactory() { + public <S> TraverserFactory<C> getTraverserFactory() { return new CompleteTraverserFactory<>(); } diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/ReduceFunction.java b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/ReduceFunction.java index c2ceb0d..eac368d 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/ReduceFunction.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/ReduceFunction.java @@ -28,5 +28,7 @@ import java.util.function.BiFunction; public interface ReduceFunction<C, S, E> extends BiFunction<Traverser<C, S>, E, E>, CFunction<C> { public E getInitialValue(); + + public E merge(final E valueA, final E valueB); } diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/CountReduce.java b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/CountReduce.java index 8c674c3..65dde8e 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/CountReduce.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/CountReduce.java @@ -40,6 +40,11 @@ public class CountReduce<C, S> extends AbstractFunction<C, S, Long> implements R } @Override + public Long merge(final Long valueA, final Long valueB) { + return valueA + valueB; + } + + @Override public Long getInitialValue() { return 0L; } diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/Reducer.java b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/Reducer.java index 8434ca7..8103459 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/Reducer.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/Reducer.java @@ -18,10 +18,12 @@ */ package org.apache.tinkerpop.machine.functions.reduce; +import java.io.Serializable; + /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public interface Reducer<S> { +public interface Reducer<S> extends Serializable { public S get(); diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/CompleteTraverserFactory.java b/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/CompleteTraverserFactory.java index 7d9356f..59ed747 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/CompleteTraverserFactory.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/CompleteTraverserFactory.java @@ -23,9 +23,10 @@ import org.apache.tinkerpop.machine.coefficients.Coefficient; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public class CompleteTraverserFactory<C, S> implements TraverserFactory<C, S> { +public class CompleteTraverserFactory<C> implements TraverserFactory<C> { + @Override - public Traverser<C, S> create(final Coefficient<C> coefficient, final S object) { + public <S> Traverser<C, S> create(final Coefficient<C> coefficient, final S object) { return new CompleteTraverser<>(coefficient.clone(), object); } } diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/TraverserFactory.java b/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/TraverserFactory.java index 22f2911..0160e30 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/TraverserFactory.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/TraverserFactory.java @@ -20,10 +20,12 @@ package org.apache.tinkerpop.machine.traversers; import org.apache.tinkerpop.machine.coefficients.Coefficient; +import java.io.Serializable; + /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public interface TraverserFactory<C, S> { +public interface TraverserFactory<C> extends Serializable { - public Traverser<C, S> create(final Coefficient<C> coefficient, final S object); + public <S> Traverser<C, S> create(final Coefficient<C> coefficient, final S object); } diff --git a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/ReduceStep.java b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/BasicAccumulator.java similarity index 51% copy from java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/ReduceStep.java copy to java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/BasicAccumulator.java index 574f2b9..0c05d7b 100644 --- a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/ReduceStep.java +++ b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/BasicAccumulator.java @@ -16,48 +16,49 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.tinkerpop.machine.pipes; +package org.apache.tinkerpop.machine.beam; +import org.apache.beam.sdk.coders.DefaultCoder; +import org.apache.beam.sdk.transforms.Combine; import org.apache.tinkerpop.machine.functions.ReduceFunction; -import org.apache.tinkerpop.machine.functions.reduce.Reducer; import org.apache.tinkerpop.machine.traversers.Traverser; import org.apache.tinkerpop.machine.traversers.TraverserFactory; +import java.io.Serializable; + /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public class ReduceStep<C, S, E> extends AbstractStep<C, S, E> { +@DefaultCoder(ReducerCoder.class) +public class BasicAccumulator<C, S, E> implements Combine.AccumulatingCombineFn.Accumulator<Traverser<C, S>, BasicAccumulator<C, S, E>, Traverser<C, E>>, Serializable { + private E value; private final ReduceFunction<C, S, E> reduceFunction; - private final Reducer<E> reducer; - private final TraverserFactory<C, E> traverserFactory; - private boolean done = false; - - public ReduceStep(final AbstractStep<C, ?, S> previousStep, - final ReduceFunction<C, S, E> reduceFunction, - final Reducer<E> reducer, - final TraverserFactory<C, E> traverserFactory) { - super(previousStep, reduceFunction); + private final TraverserFactory<C> traverserFactory; + + public BasicAccumulator(final ReduceFunction<C, S, E> reduceFunction, final TraverserFactory<C> traverserFactory) { + super(); + this.value = reduceFunction.getInitialValue(); this.reduceFunction = reduceFunction; - this.reducer = reducer; this.traverserFactory = traverserFactory; } + public void setValue(final E value) { + this.value = value; + } + + @Override + public void addInput(final Traverser<C, S> input) { + this.value = reduceFunction.apply(input, this.value); + } + @Override - public Traverser<C, E> next() { - this.done = true; - Traverser<C, S> traverser = null; - while (this.hasNext()) { - traverser = getPreviousTraverser(); - this.reducer.update(this.reduceFunction.apply(traverser, this.reducer.get())); - } - return null == traverser ? - this.traverserFactory.create(this.function.coefficient(), this.reduceFunction.getInitialValue()) : - traverser.reduce(this.reducer); + public void mergeAccumulator(final BasicAccumulator<C, S, E> other) { + this.value = this.reduceFunction.apply(this.traverserFactory.create(this.reduceFunction.coefficient(), (S) this.value), other.value); } @Override - public boolean hasNext() { - return !this.done; + public Traverser<C, E> extractOutput() { + return this.traverserFactory.create(this.reduceFunction.coefficient(), this.value); } } diff --git a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java index f2813b4..0040bf7 100644 --- a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java +++ b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java @@ -20,6 +20,7 @@ package org.apache.tinkerpop.machine.beam; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; @@ -34,6 +35,7 @@ import org.apache.tinkerpop.machine.functions.MapFunction; import org.apache.tinkerpop.machine.functions.ReduceFunction; import org.apache.tinkerpop.machine.processor.Processor; import org.apache.tinkerpop.machine.traversers.CompleteTraverser; +import org.apache.tinkerpop.machine.traversers.CompleteTraverserFactory; import org.apache.tinkerpop.machine.traversers.Traverser; import java.util.ArrayList; @@ -47,7 +49,7 @@ public class Beam<C, S, E> implements Processor<C, S, E> { private final Pipeline pipeline; public static List<Traverser> OUTPUT = new ArrayList<>(); // FIX THIS! - private final List<DoFn> functions = new ArrayList<>(); + private final List<Fn> functions = new ArrayList<>(); Iterator<Traverser> iterator = null; public Beam(final List<CFunction<C>> functions) { @@ -65,14 +67,22 @@ public class Beam<C, S, E> implements Processor<C, S, E> { } else if (function instanceof MapFunction) { fn = new MapFn<>((MapFunction) function); } else if (function instanceof ReduceFunction) { - //fn = new ReduceFn<>((ReduceFunction)function) + final ReduceFn combine = new ReduceFn<>((ReduceFunction) function, new CompleteTraverserFactory<>()); + collection = (PCollection) collection.apply(Combine.globally(combine)); + this.functions.add(combine); } else throw new RuntimeException("You need a new step type:" + function); - this.functions.add(fn); - collection = (PCollection) collection.apply(ParDo.of(fn)); + + if (!(function instanceof ReduceFunction)) { + this.functions.add((Fn) fn); + collection = (PCollection) collection.apply(ParDo.of(fn)); + + } collection.setCoder(new TraverserCoder()); + + } - collection = (PCollection) collection.apply(ParDo.of(new OutputStep())); + collection.apply(ParDo.of(new OutputStep())); this.pipeline.getOptions().setRunner(new PipelineOptions.DirectRunner().create(this.pipeline.getOptions())); } diff --git a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Fn.java b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Fn.java index 1967b96..ca0a357 100644 --- a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Fn.java +++ b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Fn.java @@ -20,6 +20,8 @@ package org.apache.tinkerpop.machine.beam; import org.apache.tinkerpop.machine.traversers.Traverser; +import java.io.Serializable; + /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ diff --git a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/ReduceFn.java b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/ReduceFn.java index 37a46ee..2e9353d 100644 --- a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/ReduceFn.java +++ b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/ReduceFn.java @@ -18,25 +18,77 @@ */ package org.apache.tinkerpop.machine.beam; +import org.apache.beam.sdk.coders.CannotProvideCoderException; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.transforms.Combine; import org.apache.tinkerpop.machine.functions.ReduceFunction; -import org.apache.tinkerpop.machine.functions.reduce.Reducer; +import org.apache.tinkerpop.machine.traversers.Traverser; import org.apache.tinkerpop.machine.traversers.TraverserFactory; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public class ReduceFn<C, S, E> extends AbstractFn<C, S, E> { +public class ReduceFn<C, S, E> extends Combine.CombineFn<Traverser<C, S>, BasicAccumulator<C, S, E>, Traverser<C, E>> implements Fn<C, S, E> { private final ReduceFunction<C, S, E> reduceFunction; - private final Reducer<E> reducer; - private final TraverserFactory<C, E> traverserFactory; + private final TraverserFactory<C> traverserFactory; + public ReduceFn(final ReduceFunction<C, S, E> reduceFunction, - final Reducer<E> reducer, - final TraverserFactory<C, E> traverserFactory) { - super(reduceFunction); + final TraverserFactory<C> traverserFactory) { + //super(reduceFunction); this.reduceFunction = reduceFunction; - this.reducer = reducer; this.traverserFactory = traverserFactory; } + + + @Override + public void addStart(Traverser<C, S> traverser) { + + } + + + @Override + public BasicAccumulator<C, S, E> createAccumulator() { + return new BasicAccumulator<>(this.reduceFunction, this.traverserFactory); + } + + @Override + public BasicAccumulator<C, S, E> addInput(BasicAccumulator<C, S, E> accumulator, Traverser<C, S> input) { + accumulator.addInput(input); + return accumulator; + } + + @Override + public BasicAccumulator<C, S, E> mergeAccumulators(Iterable<BasicAccumulator<C, S, E>> accumulators) { + E value = this.reduceFunction.getInitialValue(); + for (final BasicAccumulator accumulator : accumulators) { + value = this.reduceFunction.merge(value, (E) accumulator.extractOutput().object()); + } + + final BasicAccumulator<C, S, E> accumulator = new BasicAccumulator<>(this.reduceFunction, this.traverserFactory); + accumulator.setValue(value); + return accumulator; + } + + @Override + public Traverser<C, E> extractOutput(BasicAccumulator<C, S, E> accumulator) { + return accumulator.extractOutput(); + } + + @Override + public Coder<BasicAccumulator<C, S, E>> getAccumulatorCoder(CoderRegistry registry, Coder<Traverser<C, S>> inputCoder) throws CannotProvideCoderException { + return new ReducerCoder<>(); + } + + @Override + public Coder<Traverser<C, E>> getDefaultOutputCoder(CoderRegistry registry, Coder<Traverser<C, S>> inputCoder) throws CannotProvideCoderException { + return new TraverserCoder<>(); + } + + @Override + public String toString() { + return this.reduceFunction.toString(); + } } diff --git a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/ReducerCoder.java b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/ReducerCoder.java new file mode 100644 index 0000000..dc154bc --- /dev/null +++ b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/ReducerCoder.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.machine.beam; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; + +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.OutputStream; +import java.util.Collections; +import java.util.List; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public class ReducerCoder<C, S, E> extends Coder<BasicAccumulator<C, S, E>> { + + @Override + public void encode(final BasicAccumulator<C, S, E> value, final OutputStream outStream) throws CoderException, IOException { + ObjectOutputStream outputStream = new ObjectOutputStream(outStream); + outputStream.writeObject(value); + } + + @Override + public BasicAccumulator<C, S, E> decode(InputStream inStream) throws CoderException, IOException { + try { + ObjectInputStream inputStream = new ObjectInputStream(inStream); + return (BasicAccumulator<C, S, E>) inputStream.readObject(); + } catch (final ClassNotFoundException e) { + throw new IOException(e.getMessage(), e); + } + } + + @Override + public List<? extends Coder<?>> getCoderArguments() { + return Collections.emptyList(); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + + } +} diff --git a/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java b/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java index 0c30723..39d3786 100644 --- a/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java +++ b/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java @@ -40,7 +40,7 @@ public class BeamTest { System.out.println(traversal); System.out.println(traversal.toList()); System.out.println("\n----------\n"); - traversal = g.inject(7L, 10L, 12L).as("a").c(3L).map(__.incr()).identity().incr().is(9L);//.count(); + traversal = g.inject(7L, 10L, 12L).as("a").c(3L).map(__.incr()).identity().incr().count(); System.out.println(TraversalUtil.getBytecode(traversal)); System.out.println(traversal); System.out.println(traversal.toList()); diff --git a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/InitialStep.java b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/InitialStep.java index ef4e075..0138ed3 100644 --- a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/InitialStep.java +++ b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/InitialStep.java @@ -31,9 +31,9 @@ import java.util.Iterator; public class InitialStep<C, S> extends AbstractStep<C, S, S> { private Iterator<S> objects; - private final TraverserFactory<C, S> traverserFactory; + private final TraverserFactory<C> traverserFactory; - public InitialStep(final InitialFunction<C, S> initialFunction, final TraverserFactory<C, S> traverserFactory) { + public InitialStep(final InitialFunction<C, S> initialFunction, final TraverserFactory<C> traverserFactory) { super(EmptyStep.instance(), initialFunction); this.objects = initialFunction.get(); this.traverserFactory = traverserFactory; diff --git a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java index 29f64bf..606e0ad 100644 --- a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java +++ b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java @@ -43,7 +43,7 @@ public class Pipes<C, S, E> implements Processor<C, S, E> { private Step<C, ?, E> endStep; private Step<C, S, ?> startStep = EmptyStep.instance(); - public Pipes(final List<CFunction<C>> functions, final TraverserFactory<C, S> traverserFactory) { + public Pipes(final List<CFunction<C>> functions, final TraverserFactory<C> traverserFactory) { AbstractStep<C, ?, ?> previousStep = EmptyStep.instance(); for (final CFunction<?> function : functions) { if (function instanceof NestedFunction) diff --git a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/ReduceStep.java b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/ReduceStep.java index 574f2b9..19e4cbc 100644 --- a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/ReduceStep.java +++ b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/ReduceStep.java @@ -30,13 +30,13 @@ public class ReduceStep<C, S, E> extends AbstractStep<C, S, E> { private final ReduceFunction<C, S, E> reduceFunction; private final Reducer<E> reducer; - private final TraverserFactory<C, E> traverserFactory; + private final TraverserFactory<C> traverserFactory; private boolean done = false; public ReduceStep(final AbstractStep<C, ?, S> previousStep, final ReduceFunction<C, S, E> reduceFunction, final Reducer<E> reducer, - final TraverserFactory<C, E> traverserFactory) { + final TraverserFactory<C> traverserFactory) { super(previousStep, reduceFunction); this.reduceFunction = reduceFunction; this.reducer = reducer;