came up with a nifty trick and now OLD and NEW group() are much closer in time. 432ms vs 456ms. Given that the solution is no longer an NPE in nested groups in OLAP -- this minor time hit is worth it. :). Running integration tests over night.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/e087123c Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/e087123c Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/e087123c Branch: refs/heads/TINKERPOP-1310 Commit: e087123ca9bf513a555b3666a201905df43d8f7c Parents: c890ceb Author: Marko A. Rodriguez <[email protected]> Authored: Mon May 23 17:26:35 2016 -0600 Committer: Marko A. Rodriguez <[email protected]> Committed: Mon May 23 17:26:35 2016 -0600 ---------------------------------------------------------------------- .../process/traversal/step/map/GroupStep.java | 168 ++++++++++++------- .../step/sideEffect/GroupSideEffectStep.java | 20 ++- .../step/sideEffect/GroovyGroupTest.groovy | 2 +- 3 files changed, 116 insertions(+), 74 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e087123c/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java index b430d8f..7a93796 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java @@ -42,7 +42,6 @@ import org.javatuples.Pair; import java.io.Serializable; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -62,7 +61,7 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>> public GroupStep(final Traversal.Admin traversal) { super(traversal); this.valueTraversal = this.integrateChild(__.fold().asAdmin()); - this.preTraversal = this.integrateChild(splitOnBarrierStep(this.valueTraversal).get(0)); + this.preTraversal = this.integrateChild(generatePreTraversal(this.valueTraversal)); this.setReducingBiOperator(new GroupBiOperator<>(this.valueTraversal)); this.setSeedSupplier(HashMapSupplier.instance()); } @@ -74,7 +73,7 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>> this.state = 'v'; } else if ('v' == this.state) { this.valueTraversal = this.integrateChild(convertValueTraversal(kvTraversal)); - this.preTraversal = this.integrateChild(splitOnBarrierStep(this.valueTraversal).get(0)); + this.preTraversal = this.integrateChild(generatePreTraversal(this.valueTraversal)); this.setReducingBiOperator(new GroupBiOperator<>(this.valueTraversal)); this.state = 'x'; } else { @@ -85,11 +84,15 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>> @Override public Map<K, V> projectTraverser(final Traverser.Admin<S> traverser) { final Map<K, V> map = new HashMap<>(1); - final TraverserSet traverserSet = new TraverserSet<>(); - this.preTraversal.reset(); - this.preTraversal.addStart(traverser.split()); - this.preTraversal.getEndStep().forEachRemaining(traverserSet::add); - map.put(TraversalUtil.applyNullable(traverser, this.keyTraversal), (V) traverserSet); + if (null == this.preTraversal) { + map.put(TraversalUtil.applyNullable(traverser, this.keyTraversal), (V) traverser.split()); + } else { + final TraverserSet traverserSet = new TraverserSet<>(); + this.preTraversal.reset(); + this.preTraversal.addStart(traverser.split()); + this.preTraversal.getEndStep().forEachRemaining(traverserSet::add); + map.put(TraversalUtil.applyNullable(traverser, this.keyTraversal), (V) traverserSet); + } return map; } @@ -100,7 +103,7 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>> @Override public List<Traversal.Admin<?, ?>> getLocalChildren() { - final List<Traversal.Admin<?, ?>> children = new ArrayList<>(4); + final List<Traversal.Admin<?, ?>> children = new ArrayList<>(2); if (null != this.keyTraversal) children.add((Traversal.Admin) this.keyTraversal); children.add(this.valueTraversal); @@ -118,7 +121,7 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>> if (null != this.keyTraversal) clone.keyTraversal = this.keyTraversal.clone(); clone.valueTraversal = this.valueTraversal.clone(); - clone.preTraversal = this.integrateChild(GroupStep.splitOnBarrierStep(clone.valueTraversal).get(0)); + clone.preTraversal = this.integrateChild(GroupStep.generatePreTraversal(clone.valueTraversal)); return clone; } @@ -147,12 +150,14 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>> public static final class GroupBiOperator<K, V> implements BinaryOperator<Map<K, V>>, Serializable { + private static final int SIZE_LIMIT = 1000; + private Traversal.Admin<?, V> valueTraversal; - private ReducingBarrierStep reducingBarrierStep = null; + private Barrier barrierStep; public GroupBiOperator(final Traversal.Admin<?, V> valueTraversal) { this.valueTraversal = valueTraversal.clone(); - this.reducingBarrierStep = TraversalHelper.getFirstStepOfAssignableClass(ReducingBarrierStep.class, this.valueTraversal).orElse(null); + this.barrierStep = TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, this.valueTraversal).orElse(null); } public GroupBiOperator() { @@ -168,23 +173,60 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>> if (null == objectA) { objectA = objectB; } else { - if (objectA instanceof TraverserSet) { - if (objectB instanceof TraverserSet) { + if (objectA instanceof Traverser.Admin) { + if (objectB instanceof Traverser.Admin) { + final TraverserSet set = new TraverserSet(); + set.add((Traverser.Admin) objectA); + set.add((Traverser.Admin) objectB); + objectA = set; + } else if (objectB instanceof TraverserSet) { + final TraverserSet set = (TraverserSet) objectB; + set.add((Traverser.Admin) objectA); + if (null != this.barrierStep && set.size() > SIZE_LIMIT) { + this.valueTraversal.reset(); + ((Step) this.barrierStep).addStarts(set.iterator()); + objectA = this.barrierStep.nextBarrier(); + } else { + objectA = objectB; + } + } else if (objectB instanceof Pair) { + final TraverserSet set = (TraverserSet) ((Pair) objectB).getValue0(); + set.add((Traverser.Admin) objectA); + if (set.size() > SIZE_LIMIT) { + this.valueTraversal.reset(); + ((Step) this.barrierStep).addStarts(set.iterator()); + this.barrierStep.addBarrier(((Pair) objectB).getValue1()); + objectA = this.barrierStep.nextBarrier(); + } else { + objectA = Pair.with(set, ((Pair) objectB).getValue1()); + } + } else { + objectA = Pair.with(new TraverserSet((Traverser.Admin) objectA), objectB); + } + } else if (objectA instanceof TraverserSet) { + if (objectB instanceof Traverser.Admin) { + ((TraverserSet) objectA).add((Traverser.Admin) objectB); + if (null != this.barrierStep && ((TraverserSet) objectA).size() > SIZE_LIMIT) { + this.valueTraversal.reset(); + ((Step) this.barrierStep).addStarts(((TraverserSet) objectA).iterator()); + objectA = this.barrierStep.nextBarrier(); + } + } else if (objectB instanceof TraverserSet) { final TraverserSet set = (TraverserSet) objectA; set.addAll((TraverserSet) objectB); - if (null != this.reducingBarrierStep && set.size() > 1000) { + if (null != this.barrierStep && set.size() > SIZE_LIMIT) { this.valueTraversal.reset(); - this.reducingBarrierStep.addStarts(set.iterator()); - objectA = this.reducingBarrierStep.nextBarrier(); + ((Step) this.barrierStep).addStarts(set.iterator()); + objectA = this.barrierStep.nextBarrier(); } } else if (objectB instanceof Pair) { final TraverserSet set = (TraverserSet) objectA; set.addAll((TraverserSet) ((Pair) objectB).getValue0()); - if (set.size() > 1000) { + if (set.size() > SIZE_LIMIT) { this.valueTraversal.reset(); - this.reducingBarrierStep.addStarts(set.iterator()); - this.reducingBarrierStep.addBarrier(((Pair) objectB).getValue1()); - objectA = this.reducingBarrierStep.nextBarrier(); + ((Step) this.barrierStep).addStarts(set.iterator()); + this.barrierStep.addBarrier(((Pair) objectB).getValue1()); + objectA = this.barrierStep.nextBarrier(); } else { objectA = Pair.with(set, ((Pair) objectB).getValue1()); } @@ -192,50 +234,53 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>> objectA = Pair.with(objectA, objectB); } } else if (objectA instanceof Pair) { - if (objectB instanceof TraverserSet) { + if (objectB instanceof Traverser.Admin) { + ((TraverserSet) ((Pair) objectA).getValue0()).add((Traverser.Admin) objectB); + } else if (objectB instanceof TraverserSet) { final TraverserSet set = (TraverserSet) ((Pair) objectA).getValue0(); set.addAll((TraverserSet) objectB); - if (null != this.reducingBarrierStep &&set.size() > 1000) { + if (null != this.barrierStep && set.size() > SIZE_LIMIT) { this.valueTraversal.reset(); - this.reducingBarrierStep.addStarts(set.iterator()); - this.reducingBarrierStep.addBarrier(((Pair) objectA).getValue1()); - objectA = this.reducingBarrierStep.nextBarrier(); + ((Step) this.barrierStep).addStarts(set.iterator()); + this.barrierStep.addBarrier(((Pair) objectA).getValue1()); + objectA = this.barrierStep.nextBarrier(); } } else if (objectB instanceof Pair) { this.valueTraversal.reset(); - this.reducingBarrierStep.addBarrier(((Pair) objectA).getValue1()); - this.reducingBarrierStep.addBarrier(((Pair) objectB).getValue1()); - this.reducingBarrierStep.addStarts(((TraverserSet) ((Pair) objectA).getValue0()).iterator()); - this.reducingBarrierStep.addStarts(((TraverserSet) ((Pair) objectB).getValue0()).iterator()); - objectA = this.reducingBarrierStep.nextBarrier(); + this.barrierStep.addBarrier(((Pair) objectA).getValue1()); + this.barrierStep.addBarrier(((Pair) objectB).getValue1()); + ((Step) this.barrierStep).addStarts(((TraverserSet) ((Pair) objectA).getValue0()).iterator()); + ((Step) this.barrierStep).addStarts(((TraverserSet) ((Pair) objectB).getValue0()).iterator()); + objectA = this.barrierStep.nextBarrier(); } else { this.valueTraversal.reset(); - this.reducingBarrierStep.addBarrier(((Pair) objectA).getValue1()); - this.reducingBarrierStep.addBarrier(objectB); - this.reducingBarrierStep.addStarts(((TraverserSet) ((Pair) objectA).getValue0()).iterator()); - objectA = this.reducingBarrierStep.nextBarrier(); + this.barrierStep.addBarrier(((Pair) objectA).getValue1()); + this.barrierStep.addBarrier(objectB); + ((Step) this.barrierStep).addStarts(((TraverserSet) ((Pair) objectA).getValue0()).iterator()); + objectA = this.barrierStep.nextBarrier(); } } else { - if (objectB instanceof TraverserSet) { + if (objectB instanceof Traverser.Admin) { + objectA = Pair.with(new TraverserSet<>((Traverser.Admin) objectB), objectA); + } else if (objectB instanceof TraverserSet) { objectA = Pair.with(objectB, objectA); } else if (objectB instanceof Pair) { this.valueTraversal.reset(); - this.reducingBarrierStep.addBarrier(objectA); - this.reducingBarrierStep.addBarrier(((Pair) objectB).getValue1()); - this.reducingBarrierStep.addStarts(((TraverserSet) ((Pair) objectB).getValue0()).iterator()); - objectA = this.reducingBarrierStep.nextBarrier(); + this.barrierStep.addBarrier(objectA); + this.barrierStep.addBarrier(((Pair) objectB).getValue1()); + ((Step) this.barrierStep).addStarts(((TraverserSet) ((Pair) objectB).getValue0()).iterator()); + objectA = this.barrierStep.nextBarrier(); } else { this.valueTraversal.reset(); - this.reducingBarrierStep.addBarrier(objectA); - this.reducingBarrierStep.addBarrier(objectB); - objectA = this.reducingBarrierStep.nextBarrier(); + this.barrierStep.addBarrier(objectA); + this.barrierStep.addBarrier(objectB); + objectA = this.barrierStep.nextBarrier(); } } } mapA.put(key, (V) objectA); } return mapA; - } } @@ -253,28 +298,21 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>> } } - public static List<Traversal.Admin<?, ?>> splitOnBarrierStep(final Traversal.Admin<?, ?> valueTraversal) { - if (TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, valueTraversal).isPresent()) { - final Traversal.Admin<?, ?> first = __.identity().asAdmin(); - final Traversal.Admin<?, ?> second = __.identity().asAdmin(); - boolean onSecond = false; - for (final Step step : valueTraversal.getSteps()) { - if (step instanceof Barrier) - onSecond = true; - if (onSecond) - second.addStep(step.clone()); - else - first.addStep(step.clone()); - } - return Arrays.asList(first, second); - } else { - return Arrays.asList(valueTraversal.clone(), __.identity().asAdmin()); + public static Traversal.Admin<?, ?> generatePreTraversal(final Traversal.Admin<?, ?> valueTraversal) { + if (!TraversalHelper.hasStepOfAssignableClass(Barrier.class, valueTraversal)) + return valueTraversal; + final Traversal.Admin<?, ?> first = __.identity().asAdmin(); + for (final Step step : valueTraversal.getSteps()) { + if (step instanceof Barrier) + break; + first.addStep(step.clone()); } + return first.getSteps().size() == 1 ? null : first; } public static <K, V> Map<K, V> doFinalReduction(final Map<K, Object> map, final Traversal.Admin<?, V> valueTraversal) { final Map<K, V> reducedMap = new HashMap<>(map.size()); - final ReducingBarrierStep reducingBarrierStep = TraversalHelper.getFirstStepOfAssignableClass(ReducingBarrierStep.class, valueTraversal).orElse(null); + final Barrier reducingBarrierStep = TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, valueTraversal).orElse(null); IteratorUtils.removeOnNext(map.entrySet().iterator()).forEachRemaining(entry -> { valueTraversal.reset(); if (null == reducingBarrierStep) { @@ -282,10 +320,12 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>> ((TraverserSet<V>) entry.getValue()).iterator().next().get() : (V) entry.getValue()); } else { - if (entry.getValue() instanceof TraverserSet) - reducingBarrierStep.addStarts(((TraverserSet) entry.getValue()).iterator()); + if (entry.getValue() instanceof Traverser.Admin) + ((Step) reducingBarrierStep).addStart((Traverser.Admin) entry.getValue()); + else if (entry.getValue() instanceof TraverserSet) + ((Step) reducingBarrierStep).addStarts(((TraverserSet) entry.getValue()).iterator()); else if (entry.getValue() instanceof Pair) { - reducingBarrierStep.addStarts(((TraverserSet) (((Pair) entry.getValue()).getValue0())).iterator()); + ((Step) reducingBarrierStep).addStarts(((TraverserSet) (((Pair) entry.getValue()).getValue0())).iterator()); reducingBarrierStep.addBarrier((((Pair) entry.getValue()).getValue1())); } else reducingBarrierStep.addBarrier(entry.getValue()); http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e087123c/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupSideEffectStep.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupSideEffectStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupSideEffectStep.java index 4fc4ffa..b5deb02 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupSideEffectStep.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupSideEffectStep.java @@ -53,7 +53,7 @@ public final class GroupSideEffectStep<S, K, V> extends SideEffectStep<S> implem super(traversal); this.sideEffectKey = sideEffectKey; this.valueTraversal = this.integrateChild(__.fold().asAdmin()); - this.preTraversal = this.integrateChild(GroupStep.splitOnBarrierStep(this.valueTraversal).get(0)); + this.preTraversal = this.integrateChild(GroupStep.generatePreTraversal(this.valueTraversal)); this.getTraversal().getSideEffects().registerIfAbsent(this.sideEffectKey, HashMapSupplier.instance(), new GroupStep.GroupBiOperator<>(this.valueTraversal)); } @@ -64,7 +64,7 @@ public final class GroupSideEffectStep<S, K, V> extends SideEffectStep<S> implem this.state = 'v'; } else if ('v' == this.state) { this.valueTraversal = this.integrateChild(GroupStep.convertValueTraversal(kvTraversal)); - this.preTraversal = this.integrateChild(GroupStep.splitOnBarrierStep(this.valueTraversal).get(0)); + this.preTraversal = this.integrateChild(GroupStep.generatePreTraversal(this.valueTraversal)); this.getTraversal().getSideEffects().register(this.sideEffectKey, null, new GroupStep.GroupBiOperator<>(this.valueTraversal)); this.state = 'x'; } else { @@ -75,13 +75,15 @@ public final class GroupSideEffectStep<S, K, V> extends SideEffectStep<S> implem @Override protected void sideEffect(final Traverser.Admin<S> traverser) { final Map<K, V> map = new HashMap<>(1); - final TraverserSet midTraversers = new TraverserSet<>(); - this.preTraversal.reset(); - this.preTraversal.addStart(traverser.split()); - while (this.preTraversal.hasNext()) { - midTraversers.add(this.preTraversal.getEndStep().next()); + if (null == this.preTraversal) { + map.put(TraversalUtil.applyNullable(traverser, this.keyTraversal), (V) traverser.split()); + } else { + final TraverserSet traverserSet = new TraverserSet<>(); + this.preTraversal.reset(); + this.preTraversal.addStart(traverser.split()); + this.preTraversal.getEndStep().forEachRemaining(traverserSet::add); + map.put(TraversalUtil.applyNullable(traverser, this.keyTraversal), (V) traverserSet); } - map.put(TraversalUtil.applyNullable(traverser, this.keyTraversal), (V) midTraversers); this.getTraversal().getSideEffects().add(this.sideEffectKey, map); } @@ -115,7 +117,7 @@ public final class GroupSideEffectStep<S, K, V> extends SideEffectStep<S> implem if (null != this.keyTraversal) clone.keyTraversal = this.keyTraversal.clone(); clone.valueTraversal = this.valueTraversal.clone(); - clone.preTraversal = this.integrateChild(GroupStep.splitOnBarrierStep(clone.valueTraversal).get(0)); + clone.preTraversal = this.integrateChild(GroupStep.generatePreTraversal(clone.valueTraversal)); return clone; } http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e087123c/gremlin-groovy-test/src/main/groovy/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroovyGroupTest.groovy ---------------------------------------------------------------------- diff --git a/gremlin-groovy-test/src/main/groovy/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroovyGroupTest.groovy b/gremlin-groovy-test/src/main/groovy/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroovyGroupTest.groovy index ddfb94a..156b350 100644 --- a/gremlin-groovy-test/src/main/groovy/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroovyGroupTest.groovy +++ b/gremlin-groovy-test/src/main/groovy/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroovyGroupTest.groovy @@ -111,7 +111,7 @@ public abstract class GroovyGroupTest { @Override public Traversal<Vertex, Map<Long, Map<String, List<Vertex>>>> get_g_V_group_byXbothE_countX_byXgroup_byXlabelXX() { - new ScriptTraversal<>(g, "gremlin-groovy", "g.V.group().by(bothE().count).by(group.by(label))") + new ScriptTraversal<>(g, "gremlin-groovy", "g.V.group().by(bothE().count).by(group().by(label))") } } }
