This is an automated email from the ASF dual-hosted git repository. okram pushed a commit to branch tp4 in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit 3e72077ba8a418cf36fdbbe0ae691d662056be41 Author: Marko A. Rodriguez <[email protected]> AuthorDate: Mon Mar 11 11:32:16 2019 -0600 got SumReduce working easily. Lots of Traverser tweaks. Added @dkuppitz NumberHelper class from TP3. --- .../org/apache/tinkerpop/language/Symbols.java | 1 + .../org/apache/tinkerpop/language/Traversal.java | 5 + .../tinkerpop/machine/bytecode/Bytecode.java | 7 - .../tinkerpop/machine/bytecode/BytecodeUtil.java | 9 + .../machine/functions/reduce/SumReduce.java} | 32 +- .../machine/traversers/CompleteTraverser.java | 13 +- .../tinkerpop/machine/traversers/Traverser.java | 17 +- .../machine/traversers/TraverserFactory.java | 1 + .../org/apache/tinkerpop/util/NumberHelper.java | 445 +++++++++++++++++++++ .../apache/tinkerpop/machine/beam/AbstractFn.java | 4 +- .../org/apache/tinkerpop/machine/beam/Beam.java | 17 +- .../apache/tinkerpop/machine/beam/InitialFn.java | 9 +- .../org/apache/tinkerpop/machine/beam/MapFn.java | 4 +- .../apache/tinkerpop/machine/beam/ReduceFn.java | 5 +- .../org/apache/tinkerpop/machine/pipes/Pipes.java | 4 +- .../apache/tinkerpop/machine/pipes/ReduceStep.java | 8 +- .../machine/pipes/util}/BasicReducer.java | 2 +- .../tinkerpop/machine/pipes/util}/Reducer.java | 2 +- .../apache/tinkerpop/machine/pipes/PipesTest.java | 5 +- 19 files changed, 525 insertions(+), 65 deletions(-) diff --git a/java/core/src/main/java/org/apache/tinkerpop/language/Symbols.java b/java/core/src/main/java/org/apache/tinkerpop/language/Symbols.java index 2912789..4f5ed5f 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/language/Symbols.java +++ b/java/core/src/main/java/org/apache/tinkerpop/language/Symbols.java @@ -39,4 +39,5 @@ public final class Symbols { public static final String INJECT = "inject"; public static final String MAP = "map"; public static final String PATH = "path"; + public static final String SUM = "sum"; } diff --git a/java/core/src/main/java/org/apache/tinkerpop/language/Traversal.java b/java/core/src/main/java/org/apache/tinkerpop/language/Traversal.java index ba090d9..e9ebb85 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/language/Traversal.java +++ b/java/core/src/main/java/org/apache/tinkerpop/language/Traversal.java @@ -100,6 +100,11 @@ public class Traversal<C, S, E> implements Iterator<E> { return (Traversal) this; } + public <R extends Number> Traversal<C, S, R> sum() { + this.bytecode.addInstruction(this.currentCoefficient, Symbols.SUM); + return (Traversal) this; + } + /////// diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/Bytecode.java b/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/Bytecode.java index 1529a5c..ddc1cf9 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/Bytecode.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/Bytecode.java @@ -19,8 +19,6 @@ package org.apache.tinkerpop.machine.bytecode; import org.apache.tinkerpop.machine.coefficients.Coefficient; -import org.apache.tinkerpop.machine.traversers.CompleteTraverserFactory; -import org.apache.tinkerpop.machine.traversers.TraverserFactory; import java.util.ArrayList; import java.util.List; @@ -60,11 +58,6 @@ public class Bytecode<C> implements Cloneable { return this.instructions.get(this.instructions.size() - 1); } - // this should be part of withProcessor! - public <S> TraverserFactory<C> getTraverserFactory() { - return new CompleteTraverserFactory<>(); - } - @Override public String toString() { return this.instructions.toString(); diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java b/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java index 538825a..61882b7 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java @@ -28,8 +28,11 @@ import org.apache.tinkerpop.machine.functions.map.IncrMap; import org.apache.tinkerpop.machine.functions.map.MapMap; import org.apache.tinkerpop.machine.functions.map.PathMap; import org.apache.tinkerpop.machine.functions.reduce.CountReduce; +import org.apache.tinkerpop.machine.functions.reduce.SumReduce; import org.apache.tinkerpop.machine.processor.ProcessorFactory; import org.apache.tinkerpop.machine.strategies.Strategy; +import org.apache.tinkerpop.machine.traversers.CompleteTraverserFactory; +import org.apache.tinkerpop.machine.traversers.TraverserFactory; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; @@ -106,6 +109,10 @@ public final class BytecodeUtil { } } + public static <C> Optional<TraverserFactory<C>> getTraverserFactory(final Bytecode<C> bytecode) { + return Optional.of(new CompleteTraverserFactory<C>()); + } + private static <C> CFunction<C> generateFunction(final Instruction<C> instruction) { final String op = instruction.op(); final Coefficient<C> coefficient = instruction.coefficient(); @@ -125,6 +132,8 @@ public final class BytecodeUtil { return new MapMap<>(coefficient, labels, compile((Bytecode<C>) instruction.args()[0])); case Symbols.PATH: return new PathMap<>(coefficient, labels); + case Symbols.SUM: + return new SumReduce<>(coefficient, labels); default: throw new RuntimeException("This is an unknown instruction:" + instruction.op()); } diff --git a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/AbstractFn.java b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/SumReduce.java similarity index 51% copy from java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/AbstractFn.java copy to java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/SumReduce.java index 63caa5e..8963c2f 100644 --- a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/AbstractFn.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/SumReduce.java @@ -16,33 +16,37 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.tinkerpop.machine.beam; +package org.apache.tinkerpop.machine.functions.reduce; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.tinkerpop.machine.functions.CFunction; +import org.apache.tinkerpop.machine.coefficients.Coefficient; +import org.apache.tinkerpop.machine.functions.AbstractFunction; +import org.apache.tinkerpop.machine.functions.ReduceFunction; import org.apache.tinkerpop.machine.traversers.Traverser; -import org.apache.tinkerpop.machine.traversers.TraverserSet; +import org.apache.tinkerpop.util.NumberHelper; + +import java.util.Set; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public abstract class AbstractFn<C, S, E> extends DoFn<Traverser<C, S>, Traverser<C, E>> implements Fn<C, S, E> { - - protected final TraverserSet<C, S> traversers = new TraverserSet<>(); - protected final CFunction<C> function; +public class SumReduce<C, S extends Number> extends AbstractFunction<C, S, S> implements ReduceFunction<C, S, S> { - protected AbstractFn(final CFunction<C> function) { - this.function = function; + public SumReduce(final Coefficient<C> coefficient, final Set<String> labels) { + super(coefficient, labels); } @Override - public void addStart(final Traverser<C, S> traverser) { - this.traversers.add(traverser); + public S apply(final Traverser<C, S> traverser, final S currentValue) { + return (S) NumberHelper.add(currentValue, NumberHelper.mul(traverser.object(), traverser.coefficient().count())); } @Override - public String toString() { - return this.function.toString(); + public S merge(final S valueA, final S valueB) { + return (S) NumberHelper.add(valueA, valueB); } + @Override + public S getInitialValue() { + return (S) NumberHelper.add(0, 0); + } } diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/CompleteTraverser.java b/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/CompleteTraverser.java index 81d17fc..d33821b 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/CompleteTraverser.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/CompleteTraverser.java @@ -19,8 +19,9 @@ package org.apache.tinkerpop.machine.traversers; import org.apache.tinkerpop.machine.coefficients.Coefficient; +import org.apache.tinkerpop.machine.functions.CFunction; +import org.apache.tinkerpop.machine.functions.ReduceFunction; -import java.io.Serializable; import java.util.Collections; /** @@ -58,9 +59,13 @@ public class CompleteTraverser<C, S> implements Traverser<C, S> { } @Override - public <E> Traverser<C, E> split(final Coefficient<C> eCoefficient, final E eObject) { - final CompleteTraverser<C, E> clone = new CompleteTraverser<>(eCoefficient, eObject); - clone.path = new Path(this.path); + public <E> Traverser<C, E> split(final CFunction<C> function, final E eObject) { + final CompleteTraverser<C, E> clone = new CompleteTraverser<>( + function instanceof ReduceFunction ? + function.coefficient().clone().unity() : + function.coefficient().clone().multiply(this.coefficient().value()), eObject); + clone.path = function instanceof ReduceFunction ? new Path() : new Path(this.path); + clone.path.add(function.labels(), eObject); return clone; } diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/Traverser.java b/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/Traverser.java index 27c9f92..8a3a320 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/Traverser.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/Traverser.java @@ -19,10 +19,11 @@ package org.apache.tinkerpop.machine.traversers; import org.apache.tinkerpop.machine.coefficients.Coefficient; +import org.apache.tinkerpop.machine.functions.CFunction; import org.apache.tinkerpop.machine.functions.FilterFunction; import org.apache.tinkerpop.machine.functions.FlatMapFunction; import org.apache.tinkerpop.machine.functions.MapFunction; -import org.apache.tinkerpop.machine.functions.reduce.Reducer; +import org.apache.tinkerpop.machine.functions.ReduceFunction; import java.io.Serializable; import java.util.Collections; @@ -50,7 +51,6 @@ public interface Traverser<C, S> extends Serializable { public default boolean filter(final FilterFunction<C, S> function) { if (function.test(this)) { - this.coefficient().multiply(function.coefficient().value()); this.addLabels(function.labels()); return true; } else { @@ -59,11 +59,7 @@ public interface Traverser<C, S> extends Serializable { } public default <E> Traverser<C, E> map(final MapFunction<C, S, E> function) { - final Coefficient<C> eCoefficient = this.coefficient().clone().multiply(function.coefficient().value()); - final E eObject = function.apply(this); - final Traverser<C, E> traverser = this.split(eCoefficient, eObject); - traverser.addLabels(function.labels()); - return traverser; + return this.split(function, function.apply(this)); } public default <E> Iterator<Traverser<C, E>> flatMap(final FlatMapFunction<C, S, E> function) { @@ -72,10 +68,9 @@ public interface Traverser<C, S> extends Serializable { //public default void sideEffect(final SideEffectFunction<C,S> function); - public default <E> Traverser<C, E> reduce(final Reducer<E> reducer) { - final Traverser<C, E> traverser = this.split(this.coefficient().clone().unity(), reducer.get()); - return traverser; + public default <E> Traverser<C, E> reduce(final ReduceFunction<C, S, E> function, final E reducedValue) { + return this.split(function, reducedValue); } - public <E> Traverser<C, E> split(final Coefficient<C> coefficient, final E object); + public <E> Traverser<C, E> split(final CFunction<C> function, final E object); } diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/TraverserFactory.java b/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/TraverserFactory.java index 0160e30..c64c81e 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/TraverserFactory.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/TraverserFactory.java @@ -28,4 +28,5 @@ import java.io.Serializable; public interface TraverserFactory<C> extends Serializable { public <S> Traverser<C, S> create(final Coefficient<C> coefficient, final S object); + } diff --git a/java/core/src/main/java/org/apache/tinkerpop/util/NumberHelper.java b/java/core/src/main/java/org/apache/tinkerpop/util/NumberHelper.java new file mode 100644 index 0000000..8a2ae11 --- /dev/null +++ b/java/core/src/main/java/org/apache/tinkerpop/util/NumberHelper.java @@ -0,0 +1,445 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.util; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.math.MathContext; +import java.util.function.BiFunction; + +/** + * @author Daniel Kuppitz (http://gremlin.guru) + */ + +public final class NumberHelper { + + static final NumberHelper BYTE_NUMBER_HELPER = new NumberHelper( + (a, b) -> a.byteValue() + b.byteValue(), + (a, b) -> a.byteValue() - b.byteValue(), + (a, b) -> a.byteValue() * b.byteValue(), + (a, b) -> a.byteValue() / b.byteValue(), + (a, b) -> { + if (isNumber(a)) { + if (isNumber(b)) { + final byte x = a.byteValue(), y = b.byteValue(); + return x <= y ? x : y; + } + return a.byteValue(); + } + return b.byteValue(); + }, + (a, b) -> { + if (isNumber(a)) { + if (isNumber(b)) { + final byte x = a.byteValue(), y = b.byteValue(); + return x >= y ? x : y; + } + return a.byteValue(); + } + return b.byteValue(); + }, + (a, b) -> Byte.compare(a.byteValue(), b.byteValue())); + + static final NumberHelper SHORT_NUMBER_HELPER = new NumberHelper( + (a, b) -> a.shortValue() + b.shortValue(), + (a, b) -> a.shortValue() - b.shortValue(), + (a, b) -> a.shortValue() * b.shortValue(), + (a, b) -> a.shortValue() / b.shortValue(), + (a, b) -> { + if (isNumber(a)) { + if (isNumber(b)) { + final short x = a.shortValue(), y = b.shortValue(); + return x <= y ? x : y; + } + return a.shortValue(); + } + return b.shortValue(); + }, + (a, b) -> { + if (isNumber(a)) { + if (isNumber(b)) { + final short x = a.shortValue(), y = b.shortValue(); + return x >= y ? x : y; + } + return a.shortValue(); + } + return b.shortValue(); + }, + (a, b) -> Short.compare(a.shortValue(), b.shortValue())); + + static final NumberHelper INTEGER_NUMBER_HELPER = new NumberHelper( + (a, b) -> a.intValue() + b.intValue(), + (a, b) -> a.intValue() - b.intValue(), + (a, b) -> a.intValue() * b.intValue(), + (a, b) -> a.intValue() / b.intValue(), + (a, b) -> { + if (isNumber(a)) { + if (isNumber(b)) { + final int x = a.intValue(), y = b.intValue(); + return x <= y ? x : y; + } + return a.intValue(); + } + return b.intValue(); + }, + (a, b) -> { + if (isNumber(a)) { + if (isNumber(b)) { + final int x = a.intValue(), y = b.intValue(); + return x >= y ? x : y; + } + return a.intValue(); + } + return b.intValue(); + }, + (a, b) -> Integer.compare(a.intValue(), b.intValue())); + + static final NumberHelper LONG_NUMBER_HELPER = new NumberHelper( + (a, b) -> a.longValue() + b.longValue(), + (a, b) -> a.longValue() - b.longValue(), + (a, b) -> a.longValue() * b.longValue(), + (a, b) -> a.longValue() / b.longValue(), + (a, b) -> { + if (isNumber(a)) { + if (isNumber(b)) { + final long x = a.longValue(), y = b.longValue(); + return x <= y ? x : y; + } + return a.longValue(); + } + return b.longValue(); + }, + (a, b) -> { + if (isNumber(a)) { + if (isNumber(b)) { + final long x = a.longValue(), y = b.longValue(); + return x >= y ? x : y; + } + return a.longValue(); + } + return b.longValue(); + }, + (a, b) -> Long.compare(a.longValue(), b.longValue())); + + static final NumberHelper BIG_INTEGER_NUMBER_HELPER = new NumberHelper( + (a, b) -> bigIntegerValue(a).add(bigIntegerValue(b)), + (a, b) -> bigIntegerValue(a).subtract(bigIntegerValue(b)), + (a, b) -> bigIntegerValue(a).multiply(bigIntegerValue(b)), + (a, b) -> bigIntegerValue(a).divide(bigIntegerValue(b)), + (a, b) -> { + if (isNumber(a)) { + if (isNumber(b)) { + final BigInteger x = bigIntegerValue(a), y = bigIntegerValue(b); + return x.compareTo(y) <= 0 ? x : y; + } + return bigIntegerValue(a); + } + return bigIntegerValue(b); + }, + (a, b) -> { + if (isNumber(a)) { + if (isNumber(b)) { + final BigInteger x = bigIntegerValue(a), y = bigIntegerValue(b); + return x.compareTo(y) >= 0 ? x : y; + } + return bigIntegerValue(a); + } + return bigIntegerValue(b); + }, + (a, b) -> bigIntegerValue(a).compareTo(bigIntegerValue(b))); + + static final NumberHelper FLOAT_NUMBER_HELPER = new NumberHelper( + (a, b) -> a.floatValue() + b.floatValue(), + (a, b) -> a.floatValue() - b.floatValue(), + (a, b) -> a.floatValue() * b.floatValue(), + (a, b) -> a.floatValue() / b.floatValue(), + (a, b) -> { + if (isNumber(a)) { + if (isNumber(b)) { + final float x = a.floatValue(), y = b.floatValue(); + return x <= y ? x : y; + } + return a.floatValue(); + } + return b.floatValue(); + }, + (a, b) -> { + if (isNumber(a)) { + if (isNumber(b)) { + final float x = a.floatValue(), y = b.floatValue(); + return x >= y ? x : y; + } + return a.floatValue(); + } + return b.floatValue(); + }, + (a, b) -> Float.compare(a.floatValue(), b.floatValue())); + + static final NumberHelper DOUBLE_NUMBER_HELPER = new NumberHelper( + (a, b) -> a.doubleValue() + b.doubleValue(), + (a, b) -> a.doubleValue() - b.doubleValue(), + (a, b) -> a.doubleValue() * b.doubleValue(), + (a, b) -> a.doubleValue() / b.doubleValue(), + (a, b) -> { + if (isNumber(a)) { + if (isNumber(b)) { + final double x = a.doubleValue(), y = b.doubleValue(); + return x <= y ? x : y; + } + return a.doubleValue(); + } + return b.doubleValue(); + }, + (a, b) -> { + if (isNumber(a)) { + if (isNumber(b)) { + final double x = a.doubleValue(), y = b.doubleValue(); + return x >= y ? x : y; + } + return a.doubleValue(); + } + return b.doubleValue(); + }, + (a, b) -> Double.compare(a.doubleValue(), b.doubleValue())); + + static final NumberHelper BIG_DECIMAL_NUMBER_HELPER = new NumberHelper( + (a, b) -> bigDecimalValue(a).add(bigDecimalValue(b)), + (a, b) -> bigDecimalValue(a).subtract(bigDecimalValue(b)), + (a, b) -> bigDecimalValue(a).multiply(bigDecimalValue(b)), + (a, b) -> { + final BigDecimal ba = bigDecimalValue(a); + final BigDecimal bb = bigDecimalValue(b); + try { + return ba.divide(bb); + } catch (ArithmeticException ignored) { + // set a default precision + final int precision = Math.max(ba.precision(), bb.precision()) + 10; + BigDecimal result = ba.divide(bb, new MathContext(precision)); + final int scale = Math.max(Math.max(ba.scale(), bb.scale()), 10); + if (result.scale() > scale) result = result.setScale(scale, BigDecimal.ROUND_HALF_UP); + return result; + } + }, + (a, b) -> { + if (isNumber(a)) { + if (isNumber(b)) { + final BigDecimal x = bigDecimalValue(a), y = bigDecimalValue(b); + return x.compareTo(y) <= 0 ? x : y; + } + return bigDecimalValue(a); + } + return bigDecimalValue(b); + }, + (a, b) -> { + if (isNumber(a)) { + if (isNumber(b)) { + final BigDecimal x = bigDecimalValue(a), y = bigDecimalValue(b); + return x.compareTo(y) >= 0 ? x : y; + } + return bigDecimalValue(a); + } + return bigDecimalValue(b); + }, + (a, b) -> bigDecimalValue(a).compareTo(bigDecimalValue(b))); + + public final BiFunction<Number, Number, Number> add; + public final BiFunction<Number, Number, Number> sub; + public final BiFunction<Number, Number, Number> mul; + public final BiFunction<Number, Number, Number> div; + public final BiFunction<Number, Number, Number> min; + public final BiFunction<Number, Number, Number> max; + public final BiFunction<Number, Number, Integer> cmp; + + private NumberHelper(final BiFunction<Number, Number, Number> add, + final BiFunction<Number, Number, Number> sub, + final BiFunction<Number, Number, Number> mul, + final BiFunction<Number, Number, Number> div, + final BiFunction<Number, Number, Number> min, + final BiFunction<Number, Number, Number> max, + final BiFunction<Number, Number, Integer> cmp + ) { + this.add = add; + this.sub = sub; + this.mul = mul; + this.div = div; + this.min = min; + this.max = max; + this.cmp = cmp; + } + + public static Class<? extends Number> getHighestCommonNumberClass(final Number... numbers) { + return getHighestCommonNumberClass(false, numbers); + } + + public static Class<? extends Number> getHighestCommonNumberClass(final boolean forceFloatingPoint, final Number... numbers) { + int bits = 8; + boolean fp = forceFloatingPoint; + for (final Number number : numbers) { + if (!isNumber(number)) continue; + final Class<? extends Number> clazz = number.getClass(); + if (clazz.equals(Byte.class)) continue; + if (clazz.equals(Short.class)) { + bits = bits < 16 ? 16 : bits; + } else if (clazz.equals(Integer.class)) { + bits = bits < 32 ? 32 : bits; + } else if (clazz.equals(Long.class)) { + bits = bits < 64 ? 64 : bits; + } else if (clazz.equals(BigInteger.class)) { + bits = bits < 128 ? 128 : bits; + } else if (clazz.equals(Float.class)) { + bits = bits < 32 ? 32 : bits; + fp = true; + } else if (clazz.equals(Double.class)) { + bits = bits < 64 ? 64 : bits; + fp = true; + } else /*if (clazz.equals(BigDecimal.class))*/ { + bits = bits < 128 ? 128 : bits; + fp = true; + break; // maxed out, no need to check remaining numbers + } + } + return determineNumberClass(bits, fp); + } + + public static Number add(final Number a, final Number b) { + final Class<? extends Number> clazz = getHighestCommonNumberClass(a, b); + return getHelper(clazz).add.apply(a, b); + } + + public static Number sub(final Number a, final Number b) { + final Class<? extends Number> clazz = getHighestCommonNumberClass(a, b); + return getHelper(clazz).sub.apply(a, b); + } + + public static Number mul(final Number a, final Number b) { + final Class<? extends Number> clazz = getHighestCommonNumberClass(a, b); + return getHelper(clazz).mul.apply(a, b); + } + + public static Number div(final Number a, final Number b) { + return div(a, b, false); + } + + public static Number div(final Number a, final Number b, final boolean forceFloatingPoint) { + final Class<? extends Number> clazz = getHighestCommonNumberClass(forceFloatingPoint, a, b); + return getHelper(clazz).div.apply(a, b); + } + + public static Number min(final Number a, final Number b) { + final Class<? extends Number> clazz = getHighestCommonNumberClass(a, b); + return getHelper(clazz).min.apply(a, b); + } + + public static Comparable min(final Comparable a, final Comparable b) { + if (a instanceof Number && b instanceof Number) { + final Number an = (Number) a, bn = (Number) b; + final Class<? extends Number> clazz = getHighestCommonNumberClass(an, bn); + return (Comparable) getHelper(clazz).min.apply(an, bn); + } + return isNonValue(a) ? b : + isNonValue(b) ? a : + a.compareTo(b) < 0 ? a : b; + } + + public static Number max(final Number a, final Number b) { + final Class<? extends Number> clazz = getHighestCommonNumberClass(a, b); + return getHelper(clazz).max.apply(a, b); + } + + public static Comparable max(final Comparable a, final Comparable b) { + if (a instanceof Number && b instanceof Number) { + final Number an = (Number) a, bn = (Number) b; + final Class<? extends Number> clazz = getHighestCommonNumberClass(an, bn); + return (Comparable) getHelper(clazz).max.apply(an, bn); + } + return isNonValue(a) ? b : + isNonValue(b) ? a : + a.compareTo(b) > 0 ? a : b; + } + + public static Integer compare(final Number a, final Number b) { + final Class<? extends Number> clazz = getHighestCommonNumberClass(a, b); + return getHelper(clazz).cmp.apply(a, b); + } + + private static NumberHelper getHelper(final Class<? extends Number> clazz) { + if (clazz.equals(Byte.class)) { + return BYTE_NUMBER_HELPER; + } + if (clazz.equals(Short.class)) { + return SHORT_NUMBER_HELPER; + } + if (clazz.equals(Integer.class)) { + return INTEGER_NUMBER_HELPER; + } + if (clazz.equals(Long.class)) { + return LONG_NUMBER_HELPER; + } + if (clazz.equals(BigInteger.class)) { + return BIG_INTEGER_NUMBER_HELPER; + } + if (clazz.equals(Float.class)) { + return FLOAT_NUMBER_HELPER; + } + if (clazz.equals(Double.class)) { + return DOUBLE_NUMBER_HELPER; + } + if (clazz.equals(BigDecimal.class)) { + return BIG_DECIMAL_NUMBER_HELPER; + } + throw new IllegalArgumentException("Unsupported numeric type: " + clazz); + } + + private static BigInteger bigIntegerValue(final Number number) { + if (number == null) return null; + if (number instanceof BigInteger) return (BigInteger) number; + return BigInteger.valueOf(number.longValue()); + } + + private static BigDecimal bigDecimalValue(final Number number) { + if (number == null) return null; + if (number instanceof BigDecimal) return (BigDecimal) number; + if (number instanceof BigInteger) return new BigDecimal((BigInteger) number); + return (number instanceof Double || number instanceof Float) + ? BigDecimal.valueOf(number.doubleValue()) + : BigDecimal.valueOf(number.longValue()); + } + + private static Class<? extends Number> determineNumberClass(final int bits, final boolean floatingPoint) { + if (floatingPoint) { + if (bits <= 32) return Float.class; + if (bits <= 64) return Double.class; + return BigDecimal.class; + } else { + if (bits <= 8) return Byte.class; + if (bits <= 16) return Short.class; + if (bits <= 32) return Integer.class; + if (bits <= 64) return Long.class; + return BigInteger.class; + } + } + + private static boolean isNumber(final Number number) { + return number != null && !number.equals(Double.NaN); + } + + private static boolean isNonValue(final Object value) { + return value instanceof Double && !isNumber((Double) value); + } +} diff --git a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/AbstractFn.java b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/AbstractFn.java index 63caa5e..c5c6230 100644 --- a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/AbstractFn.java +++ b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/AbstractFn.java @@ -28,7 +28,7 @@ import org.apache.tinkerpop.machine.traversers.TraverserSet; */ public abstract class AbstractFn<C, S, E> extends DoFn<Traverser<C, S>, Traverser<C, E>> implements Fn<C, S, E> { - protected final TraverserSet<C, S> traversers = new TraverserSet<>(); + protected final TraverserSet<C, S> traverserSet = new TraverserSet<>(); protected final CFunction<C> function; protected AbstractFn(final CFunction<C> function) { @@ -37,7 +37,7 @@ public abstract class AbstractFn<C, S, E> extends DoFn<Traverser<C, S>, Traverse @Override public void addStart(final Traverser<C, S> traverser) { - this.traversers.add(traverser); + this.traverserSet.add(traverser); } @Override diff --git a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java index bfe910e..3e36565 100644 --- a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java +++ b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java @@ -27,6 +27,7 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; import org.apache.tinkerpop.machine.bytecode.Bytecode; import org.apache.tinkerpop.machine.bytecode.BytecodeUtil; +import org.apache.tinkerpop.machine.coefficients.Coefficient; import org.apache.tinkerpop.machine.coefficients.LongCoefficient; import org.apache.tinkerpop.machine.functions.CFunction; import org.apache.tinkerpop.machine.functions.FilterFunction; @@ -34,9 +35,8 @@ import org.apache.tinkerpop.machine.functions.InitialFunction; import org.apache.tinkerpop.machine.functions.MapFunction; import org.apache.tinkerpop.machine.functions.ReduceFunction; import org.apache.tinkerpop.machine.processor.Processor; -import org.apache.tinkerpop.machine.traversers.CompleteTraverser; -import org.apache.tinkerpop.machine.traversers.CompleteTraverserFactory; import org.apache.tinkerpop.machine.traversers.Traverser; +import org.apache.tinkerpop.machine.traversers.TraverserFactory; import java.util.ArrayList; import java.util.Iterator; @@ -47,27 +47,29 @@ import java.util.List; */ public class Beam<C, S, E> implements Processor<C, S, E> { + private final Pipeline pipeline; public static List<Traverser> OUTPUT = new ArrayList<>(); // FIX THIS! private final List<Fn> functions = new ArrayList<>(); Iterator<Traverser> iterator = null; - public Beam(final List<CFunction<C>> functions) { + + public Beam(final List<CFunction<C>> functions, final TraverserFactory<C> traverserFactory) { this.pipeline = Pipeline.create(); this.pipeline.getCoderRegistry().registerCoderForClass(Traverser.class, new TraverserCoder<>()); - PCollection collection = this.pipeline.apply(Create.of(new CompleteTraverser(LongCoefficient.create(), 1L))); + PCollection collection = this.pipeline.apply(Create.of(traverserFactory.create((Coefficient) LongCoefficient.create(), 1L))); collection.setCoder(new TraverserCoder()); DoFn fn = null; for (final CFunction<?> function : functions) { if (function instanceof InitialFunction) { - fn = new InitialFn<>((InitialFunction) function); + fn = new InitialFn<>((InitialFunction) function, traverserFactory); } else if (function instanceof FilterFunction) { fn = new FilterFn<>((FilterFunction) function); } else if (function instanceof MapFunction) { fn = new MapFn<>((MapFunction) function); } else if (function instanceof ReduceFunction) { - final ReduceFn combine = new ReduceFn<>((ReduceFunction) function, new CompleteTraverserFactory<>()); + final ReduceFn combine = new ReduceFn<>((ReduceFunction) function, traverserFactory); collection = (PCollection) collection.apply(Combine.globally(combine)); this.functions.add(combine); } else @@ -76,7 +78,6 @@ public class Beam<C, S, E> implements Processor<C, S, E> { if (!(function instanceof ReduceFunction)) { this.functions.add((Fn) fn); collection = (PCollection) collection.apply(ParDo.of(fn)); - } collection.setCoder(new TraverserCoder()); @@ -87,7 +88,7 @@ public class Beam<C, S, E> implements Processor<C, S, E> { } public Beam(final Bytecode<C> bytecode) { - this(BytecodeUtil.compile(BytecodeUtil.strategize(bytecode))); + this(BytecodeUtil.compile(BytecodeUtil.strategize(bytecode)), BytecodeUtil.getTraverserFactory(bytecode).get()); } @Override diff --git a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/InitialFn.java b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/InitialFn.java index 33556e6..c5b5d5e 100644 --- a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/InitialFn.java +++ b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/InitialFn.java @@ -18,10 +18,9 @@ */ package org.apache.tinkerpop.machine.beam; -import org.apache.tinkerpop.machine.coefficients.LongCoefficient; import org.apache.tinkerpop.machine.functions.InitialFunction; -import org.apache.tinkerpop.machine.traversers.CompleteTraverser; import org.apache.tinkerpop.machine.traversers.Traverser; +import org.apache.tinkerpop.machine.traversers.TraverserFactory; import java.util.Iterator; @@ -31,17 +30,19 @@ import java.util.Iterator; public class InitialFn<C, S> extends AbstractFn<C, S, S> { private final InitialFunction<C, S> initialFunction; + private final TraverserFactory traverserFactory; - public InitialFn(final InitialFunction<C, S> initialFunction) { + public InitialFn(final InitialFunction<C, S> initialFunction, final TraverserFactory<C> traverserFactory) { super(initialFunction); this.initialFunction = initialFunction; + this.traverserFactory = traverserFactory; } @ProcessElement public void processElement(final @Element Traverser<C, S> traverser, final OutputReceiver<Traverser<C, S>> output) { final Iterator<S> iterator = this.initialFunction.get(); while (iterator.hasNext()) { - output.output(new CompleteTraverser(LongCoefficient.create(), iterator.next())); + output.output(this.traverserFactory.create(this.initialFunction.coefficient(), iterator.next())); } } } \ No newline at end of file diff --git a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/MapFn.java b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/MapFn.java index 18d916f..296f491 100644 --- a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/MapFn.java +++ b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/MapFn.java @@ -45,8 +45,8 @@ public class MapFn<C, S, E> extends AbstractFn<C, S, E> { if (this.mapFunction instanceof NestedFunction) { Pipes beam = new Pipes(((NestedFunction) this.mapFunction).getFunctions(), new CompleteTraverserFactory()); ((NestedFunction) this.mapFunction).setProcessor(beam); - while (!this.traversers.isEmpty()) { - beam.addStart(this.traversers.remove()); + while (!this.traverserSet.isEmpty()) { + beam.addStart(this.traverserSet.remove()); } } this.first = false; diff --git a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/ReduceFn.java b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/ReduceFn.java index 5a53d13..17d2c3d 100644 --- a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/ReduceFn.java +++ b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/ReduceFn.java @@ -35,8 +35,7 @@ public class ReduceFn<C, S, E> extends Combine.CombineFn<Traverser<C, S>, BasicA private final TraverserFactory<C> traverserFactory; - public ReduceFn(final ReduceFunction<C, S, E> reduceFunction, - final TraverserFactory<C> traverserFactory) { + public ReduceFn(final ReduceFunction<C, S, E> reduceFunction, final TraverserFactory<C> traverserFactory) { this.reduceFunction = reduceFunction; this.traverserFactory = traverserFactory; } @@ -71,7 +70,7 @@ public class ReduceFn<C, S, E> extends Combine.CombineFn<Traverser<C, S>, BasicA } @Override - public Traverser<C, E> extractOutput(BasicAccumulator<C, S, E> accumulator) { + public Traverser<C, E> extractOutput(final BasicAccumulator<C, S, E> accumulator) { return accumulator.extractOutput(); } diff --git a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java index c335a92..a1730fc 100644 --- a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java +++ b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java @@ -26,7 +26,7 @@ import org.apache.tinkerpop.machine.functions.InitialFunction; import org.apache.tinkerpop.machine.functions.MapFunction; import org.apache.tinkerpop.machine.functions.NestedFunction; import org.apache.tinkerpop.machine.functions.ReduceFunction; -import org.apache.tinkerpop.machine.functions.reduce.BasicReducer; +import org.apache.tinkerpop.machine.pipes.util.BasicReducer; import org.apache.tinkerpop.machine.processor.Processor; import org.apache.tinkerpop.machine.traversers.Traverser; import org.apache.tinkerpop.machine.traversers.TraverserFactory; @@ -71,7 +71,7 @@ public class Pipes<C, S, E> implements Processor<C, S, E> { } public Pipes(final Bytecode<C> bytecode) { - this(BytecodeUtil.compile(BytecodeUtil.strategize(bytecode)), bytecode.getTraverserFactory()); + this(BytecodeUtil.compile(BytecodeUtil.strategize(bytecode)), BytecodeUtil.getTraverserFactory(bytecode).get()); } @Override diff --git a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/ReduceStep.java b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/ReduceStep.java index 19e4cbc..9d3a640 100644 --- a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/ReduceStep.java +++ b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/ReduceStep.java @@ -19,7 +19,7 @@ package org.apache.tinkerpop.machine.pipes; import org.apache.tinkerpop.machine.functions.ReduceFunction; -import org.apache.tinkerpop.machine.functions.reduce.Reducer; +import org.apache.tinkerpop.machine.pipes.util.Reducer; import org.apache.tinkerpop.machine.traversers.Traverser; import org.apache.tinkerpop.machine.traversers.TraverserFactory; @@ -45,19 +45,19 @@ public class ReduceStep<C, S, E> extends AbstractStep<C, S, E> { @Override public Traverser<C, E> next() { - this.done = true; Traverser<C, S> traverser = null; while (this.hasNext()) { traverser = getPreviousTraverser(); this.reducer.update(this.reduceFunction.apply(traverser, this.reducer.get())); } + this.done = true; return null == traverser ? this.traverserFactory.create(this.function.coefficient(), this.reduceFunction.getInitialValue()) : - traverser.reduce(this.reducer); + traverser.reduce(this.reduceFunction, this.reducer.get()); } @Override public boolean hasNext() { - return !this.done; + return !this.done && super.hasNext(); } } diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/BasicReducer.java b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/util/BasicReducer.java similarity index 95% rename from java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/BasicReducer.java rename to java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/util/BasicReducer.java index 40c00c1..e834c1e 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/BasicReducer.java +++ b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/util/BasicReducer.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.tinkerpop.machine.functions.reduce; +package org.apache.tinkerpop.machine.pipes.util; /** * @author Marko A. Rodriguez (http://markorodriguez.com) diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/Reducer.java b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/util/Reducer.java similarity index 94% rename from java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/Reducer.java rename to java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/util/Reducer.java index 8103459..44e680e 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/Reducer.java +++ b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/util/Reducer.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.tinkerpop.machine.functions.reduce; +package org.apache.tinkerpop.machine.pipes.util; import java.io.Serializable; diff --git a/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java b/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java index e211a7e..94fdb20 100644 --- a/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java +++ b/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java @@ -25,6 +25,7 @@ import org.apache.tinkerpop.language.TraversalUtil; import org.apache.tinkerpop.language.__; import org.apache.tinkerpop.machine.coefficients.LongCoefficient; import org.apache.tinkerpop.machine.strategies.IdentityStrategy; +import org.apache.tinkerpop.machine.traversers.Path; import org.junit.jupiter.api.Test; /** @@ -39,12 +40,12 @@ public class PipesTest { .withProcessor(PipesProcessor.class) .withStrategy(IdentityStrategy.class); - Traversal<Long, Long, Long> traversal = g.inject(7L, 10L, 12L).as("a").c(3L).map(__.incr()).identity().incr().identity().identity(); + Traversal<Long, Long, ?> traversal = g.inject(7L, 10L, 12L).as("a").c(3L).map(__.incr()).identity().incr().identity().identity().sum().path(); System.out.println(TraversalUtil.getBytecode(traversal)); System.out.println(traversal); System.out.println(traversal.toList()); System.out.println("\n----------\n"); - traversal = g.inject(7L, 10L, 12L).as("a").c(3L).map(__.incr()).identity().incr().is(44L).count(); + traversal = g.inject(7L, 10L, 12L).as("a").c(3L).map(__.incr()).identity().incr().is(14L).count().c(10L).incr().sum(); System.out.println(TraversalUtil.getBytecode(traversal)); System.out.println(traversal); System.out.println(traversal.toList());
